【翻译】弹性分布式数据集:基于内存的集群计算的容错性抽象

context

大概用了16个小时完成了这篇关于RDD论文的翻译,这篇论文奠定了Spark的设计基础。

原文:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

推荐先看林子雨老师关于RDD的解释

摘要

本文提出了一个分布式内存抽象的概念–弹性分布式数据集(Resilient Distributed Datasets,以下称为RDDs),其能够让开发人员以容错的方式在大规模集群上进行基于内存的计算。RDDs的提出是由于现有的两种计算框架并不高效:迭代式算法和交互式数据挖掘工具。在内存中操作数据可以将前两种计算方式的效率提高一个数量级。RDDs提供了一种受限的共享内存,是基于粗粒度的转换操作而不是细粒度的状态同步。尽管如此,RDDs依然能够表示多种类型的计算,包括专用的迭代编程模型(如Pregel)和一些新的应用模型。我们通过在Spark上评估各种应用和基准,实现了RDDs。

1 引言

集群计算框架,如MapReduce[10]和Dryad[19],已经被广泛地运用于大规模数据分析。这些系统能够让用户在不用考虑任务调度和容错的前提下,使用一系列高级的操作进行并行计算。

虽然这些框架为获取集权计算资源提供了大量的抽象,但是缺少对分布式内存的运用。这使他们对于一类新兴的应用十分低效:它们在不同的计算阶段重用中间结果。数据重用在迭代式机器学习和图算法中十分常见,包括PageRank、K-means聚类和逻辑回归。另一个明显的用例是交互式数据挖掘,用户在同样的数据子集上进行ad-hoc查询。然而不好的是,对于现在的框架,在不同计算阶段之间重用数据(如,在两个MapReduce的job之间)的唯一方式是将其写入外部稳定存储系统中,如,分布式文件系统。由于数据的复制、硬盘I/O和序列化,导致了大量的成本开销,并占据了应用运行的大部分时间。

在意识到这个问题之后,研究人员针对需要数据重用的应用开发了专门的框架。比如,迭代式图计算系统Pregel[22],其将中间数据存放在内存中,而HaLoop[7]提供了一种迭代式MapReduce接口。无论如何,这些框架仅支持特定的计算模式(如,循环一系列的MapReduce步骤),并隐式地提供这些模式的数据共享。它们没有提供更加通用的数据重用的抽象,如,让用户直接向内存中装载数据集,并对其进行ad-hoc查询。

在这篇论文中,我们提出了一种能够广泛运用于各种应用中高效的数据重用抽象,弹性分布式数据集(RDDs)。RDDs是一个容错的、并行的数据结构,能够让用户明确地在内存中持久化中间结果,控制其分区以优化数据的放置和使用丰富的操作符对其进行处理。

设计RDDs的主要挑战是,定义一个能够高效容错能力的编程接口。现有的基于集群的内存存储抽象,如分布式共享内存、键值对存储、数据库和Piccolo,提供了基于细粒度更新可变状态的接口(如,表中的cell)。运用这些接口时,达到数据容错的唯一方式是在不同的机器之间进行数据的冗余或者记录更新日志。这两种方法对于数据密集型工作来说代价高昂,因为他们需要在集群网络间复制大量数据,而网络带宽远小于RAM带宽,同时还会产生大量数据存储开销。

相比于这些系统,RDDs提供基于粗粒度转换的,可用于大量数据项进行相同操作的接口(如,map,filter和join)。这使得RDDs能够通过记录产生数据集的一系列转换操作(称之为lineage),而不是记录真实的数据,来提高容错的效率【1当lineage链过长时,对一些RDDs进行检查点(checkpoint)设置可能更加有用,我们在5.4节对其进行讨论】。如果一个分区的RDD丢失了,它有足够的信息知道自己是如何从其他的RDD产生的,从而重新计算该分区。因此,丢失的数据可以很快地恢复,而不需要代价昂贵的复制。

基于粗粒度转换的接口乍一看是有局限性的,但RDDs对于许多并行应用都非常适用,因为这些应用本质上就是会对多种数据项进行相同的操作。为此,我们证明了RDDs高效地运用于表达各种已经被现有的分布式系统所实现的集群编程模型,包括MapReduce,DryadLINQ,,SQL,Pregel 和 HaLoop,以及这些系统无法实现的新应用,如交互式数据挖掘。RDDs能够作为被用来解决前面提到的计算需求而引入的新框架的证据,是因为其强大的抽象能力。

我们已经在一个被用于UC Berkeley的实验环境和许多公司生产环境下的应用–Spark上,实现了RDDs。Spark提供了一个运用Scala语言,类似DryadLINQ的易用的语言集成编程接口。另外,Spark还可以在Scala解释器中进行交互式大数据集的查询。我们相信Spark会是第一个运用通用编程语言完成交互式速度下集群内存数据挖掘的系统。

我们通过微基准测试和用户应用对RDDs和Spark进行评估。我们得出,Spark在迭代式应用上比Hadoop快20倍,在真实数据报表分析上快40倍,并且能够在5-7秒的延迟内完成1TB数据集的交互式扫描。更加根本地,为了说明RDDs的通用性,我们在Spark上实现了Pregel和HaLoop的编程模型,包括用一些相对较小的库(每个库大概200行代码)实现它们所采用存储优化策略。

这篇论文首先介绍RDDs的概览(第2部分)和Spark(第3部分),然后讨论RDDs的内部表示(第4部分),实现(第5部分),和一些实验结果(第6部分)。最后,我们讨论了用RDDs实现几个现有的集群编程模型(第7部分),相关研究工作(第8部分)和总结。

2 弹性分布式数据集(RDDs)

这一部分提供关于RDDs的概述。先定义RDDs(2.1节),介绍其在Spark中的编程接口(2.2节)。然后将RDDs与细粒度共享内存抽象进行比较(2.3节)。最后讨论RDD模型的限制(2.4节)。

2.1 RDD抽象

一个RDD是只读的,是将记录进行分区的集合。RDDs只能由(1)稳定物理存储中的数据集(2)其他RDDs通过明确的操作产生。我们称这些操作为转换(transformations)以区别其他对RDDs的操作。转换的例子包括map,filter和join。【2 虽然单个RDDs是不可变的,但是可以通过多个RDDs来表示不同版本的数据集以实现多状态。我们让RDDs不可变以使lineage图表示更简便,但这也相当于将我们的抽象变成版本化数据并在lineage图中追踪不同版本】

RDDs不需要都实体化。一个RDD有足够的信息了解自己是如何从其他数据集产生的(lineage)并通过信息从稳定的物理存储中计算出自己的分区。这一强大特性的本质是,程序能够在RDD重建之后对其进行引用。

最后一点,用户可以对RDDs进行2方面的控制:持久化和分区。用户可以表明将要重用的RDDs并为其选择存储策略(如,内存存储)。也可以将RDDs的元素通过特定键值进行分区。这些功能对于存储优化特别有用,比如保证两个将要进行join操作的数据集都进行了相同的哈希分区。

2.2 Spark编程接口

Spark暴露了类似DryadLINQ和FlumeJava的RDDs语言集成API,每个数据集都被表示成一个对象,并通过执行方法在这些对象上进行转换操作(transformations)。

开发人员通过将物理存储上的数据集进行转换(如,map和filter)来定义一个或多个RDDs。然后可以用动作(actions)对这些RDDs进行操作,这些操作向应用返回值或者向存储系统产生外部数据。动作(actions)的例子包括,count(返回数据集中元素的数目),collect(返回元素本身)和save(将数据集输出到外部存储系统)。像DryadLINQ一样,Spark在遇到一个动作操作时才会真正计算出RDDs,所以其可以对转换(transformations)进行流水线操作。

此外,开发人员还可以调用persist方法来声明他们想重用的RDDs。Spark默认会将RDDs留存在内存中,但是会在没有足够RAM的情况下将它们溢出到硬盘。用户可以采用其他的持久化策略,如通过persist标价,将特定RDD只存在硬盘上或在机器之间进行备份。最后,用户可以为每个RDD设置优先级来表明当需要时,将哪一个内存中的数据溢出到硬盘中。

2.2.1 例子:控制台日志挖掘

假设一个网页服务出现错误,管理员想在HDFS中兆字节规模的日志中找到原因。应用Spark,管理员能够将错误信息从多个结点的日志中导入RAM,并交互式的进行查询。她会键入以下代码:

1
2
3
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第一行从一个HDFS文件(文本行集合)定义了一个RDD,并在第二行产生过滤后的RDD。

第三行将errors持久化在内存中,这样就能被查询。注意filter的参数是一个闭包的Scala语法。

到目前为止,并没有在集群上运行作业。但是,用户可以对RDD进行动作(actions)操作,如计算消息的数目:

errors.count()

这位用户也可以对该RDD进行进一步的转换操作并运用他们的结果,如下:

1
2
3
4
5
6
7
8
// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split(’\t’)(3))
.collect()

在第一个对errors进行动作操作之后,Spark将在内存中村塾errors的各分区,这极大地加快了下游的计算操作。注意,最原始的RDD,lines,不会加载到RAM中。这是可取的,因为错误消息可能只是数据的一小部分(小到足够放进内存)。

最后为了说明模型是如何实现容错的,图1展示了该RDDs的lineage图。在这次查询中,我们首先对lines进行过滤得到errors,然后在运行collect操作前进行更进一步的filter和map操作。Spark调度器将会流水执行后两个转换操作并向缓存了errors分区的结点发送任务的集合来对其进行计算。除此之外,如果errors的一个分区丢失了,Spark只会在相关的lines分区上进行过滤操作来重建丢失的分区。

2.3 RDD模型的优势

为了理解RDDs作为分布式内存抽象的优势,表1将其与分布式共享内存(DSM)进行了比较。在DSM系统中,应用对全局地址空间进行随意位置的读写。请注意,根据此定义,我们不仅包含了传统的共享内存系统[24],还包括应用可以进行细粒度写共享状态的其他系统,如Piccolo[27],其提供共享DHT和分布式数据库。DSM是非常通用的抽象,但是这样的通用性使其很难以一种高效且容错的方式运用在商业集群上。

RDDs与DSM的主要不同是,RDDs只能通过粗粒度的转换操作产生(“写”),而DSM允许在内存中任意位置读写【注意对RDDs的读仍然可以是细粒度的。例如,应用系统可以将一个RDD视为一个大型只读查找表】。这会限制RDDs为执行批量写入的应用程序,但是这使得其具有高效的容错性。特别是,RDD不需要产生检查点(checkpoint)的开销,因为它们可以使用lineage来恢复【在一些应用中,在具有很长lineage链的RDDs中仍然可以使用checkpoint技术,我们将在5.4节讨论。但是,这些可以在后台完成,因为RDDs是不可变的,并且不需要像在DSM中那样保留整个应用程序的快照】。而且,只有丢失的RDD分区才需要在失败时重新计算,并且它们可以在不同的节点上并行计算,而不必回滚整个程序。

RDDs的第二个优势是,由于它们不可变的特性,通过运行缓慢任务的副本来缓解慢结点对系统拖拽的压力,就和MapReduce一样[10]。使用DSM很难实现备份任务,因为任务的两个副本将访问相同的内存位置并干扰彼此的更新。

最后,RDDs提供了DSM没有的两个其他好处。其一,在对RDDs的批量操作中,一个运行时可以基于数据的位置进行任务调度以提高性能。其二,当没有足够的内存来存储RDDs时,它就会优雅地降级,使它们仅用于扫描操作。不适合RAM的分区可以存储在磁盘上,并提供与当期数据并行系统类似的性能。

2.4 不适合RDDs的应用

正如引言中所讨论的,RDDs非常适用于对数据集中所有元素进行相同操作的批处理应用。在这些情况下,RDDs可以有效地将每个转换(transformations)记录为lineage图中的一个步骤,并且可以在不记录大量数据的情况下恢复丢失的分区。RDDs不太适合对共享状态进行异步细粒度共享状态更新的应用程序,例如Web应用程序的存储系统或增量Web爬网程序。对于这些应用程序,使用执行传统更新日志记录和checkpoint的系统更有效,例如数据库,RAMCloud [25],Percolator [26]和Piccolo [27]。我们的目标是为批量分析提供高效的编程模型,并将这些异步应用程序留给专用系统。我们的目标是为批量分析提供高效的编程模型,并将这些异步应用程序留给专用系统。

3 Spark编程接口

Spark通过Scala [2]提供类似于DryadLINQ [31]语言集成API的RDDs抽象,Scala [2]是基于Java VM的静态类型函数编程语言。我们之所以选择Scala,是因为它结合了简洁(便于交互使用)和效率(静态类型)。但是,关于RDD抽象的任何内容都不需要函数式语言

要使用Spark,开发人员编写一个连接到一组workers的driver程序,如图2所示。驱动程序定义一个或多个RDDs并在其上调用动作操作。驱动程序上的Spark代码也会追踪RDDs的lineage。这些workers是长期存在的过程,能够跨操作在RAM中存储RDDs分区。

正如我们在2.2.1中的日志挖掘例子中所展示,用户想如map这样的RDD操作传递闭包(函数形式)来提供参数。Scala将每个闭包表示为Java对象,这些对象可以序列化并加载到另一个节点上,以通过网络传递闭包。Scala也将在闭包中绑定的任何变量作为Java对象的域。比如,可以编写代码:var x = 5; rdd.map(_ + x),将在RDD中的每一个元素都加5【我们在闭包创建时对其进行保存,这样例子中的map操作永远都是加5,即使x发生变化】。

RDD本身是由元素类型参数化的静态类型对象。例如,RDD [Int]是整数的RDD。但是,由于Scala支持类型推断,因此我们的大多数示例都省略了类型。

虽然我们在Scala中暴露RDDs的方法在概念上很简单,但我们必须使用反射解决Scala的闭包对象的问题[33]。我们还需要更多的工作来使Spark可以使用Scala解释器,我们将在5.2节中讨论。但是,我们不必修改Scala编译器。

3.1 Spark中的RDD操作

表2列出了Spark中可用的主要RDD转换和动作操作。我们给出每个操作的签名,在方括号中显示类型参数。再次强调转换(transformations)是定义新RDD的延迟操作,而动作(actions)启动计算将向程序返回值或将数据写入外部存储。

请注意,某些操作(如join)仅适用于键值对的RDDs。此外,我们的函数名称被选择为与Scala和其他函数式语言中的其他API匹配;例如,map是一对一映射,而flatMap将每个输入值映射到一个或多个输出(类似于MapReduce中的映射)。

除了这些运算符,用户还可以持久化RDD。此外,用户可以获得RDD的分区顺序(由Partitioner类表示),并根据它对另一个数据集进行分区。诸如groupByKey,reduceByKey和sort操作自动地会产生哈希或范围分区的RDD。

3.2 示例应用

我们使用两个迭代应用程序补充了2.2.1节中的数据挖掘示例:逻辑回归和PageRank。后者还展示了如何控制RDD的分区以提高性能。

3.2.1 logistic回归

许多机器学习算法本质上是迭代的,因为它们运行迭代优化过程,例如梯度下降,以最大化功能。因此,通过将数据保存在内存中,以更快地运行。

例如,以下程序实现了逻辑回归[14],这是用于搜索最能分开两类点(例如,垃圾邮件和非垃圾邮件)的超平面w的一个通用算法。该算法使用梯度下降:它以随机值开始w,并且在每次迭代时,它将w的函数与数据相加以沿着改善它的方向移动。

1
2
3
4
5
6
7
8
9
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}

我们首先定义一个名为points的持久RDD作为文本文件上的map转换的结果,该文本文件将每行文本解析为Point对象。然后,我们通过对当前w的函数求和,对points重复运行map和reduce以计算每一步的梯度。在迭代的过程中将points保存在内存中可以获得20倍的加速,这将在6.1节展示。

3.2.2 PageRank

更复杂的数据共享模式发生在PageRank [6]。算法通过累加在文件中对每个文件的应用次数迭代地更新每一文件的rank。在每次迭代时,每个文件都向其邻居发送r/n的贡献值,r是其排名,n是其邻居的数量。然后通过α/N + (1 − α)∑ci式子更新排名,其中求和是它所收到的贡献值,而N是文件的总数。我们可以通过如下代码Spark中实现PageRank:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}

该程序产生图3中的RDD lineage图。在每次迭代中,我们基于来自先前的迭代的contribs和ranks以及静态links数据集来创建新的排名数据集【请注意,尽管RDD是不可变的,但程序中的变量rank和contribs指向每次迭代时的不同RDDs】。图3的一个有趣特征是随着迭代数目的增加而不断增长。因此,在具有多次迭代的作业中,可能需要可靠地复制某些版本的ranks以减少故障恢复时间[20]。用户可以使用RELIABLE标志调用persist来执行此操作。但请注意,不需要复制links数据集,因为可以通过在输入文件的块上重新运行map来有效地重建它的分区。此数据集通常比ranks大得多,因为每个文档都有许多链接,但只有一个数字作为其排名,因此使用lineage恢复它比使用checkpoint回复程序整个内存中状态的系统更节省时间。

最后,我们可以通过控制RDDs的分区来优化PageRank中的通信。如果我们指定links的分区(例如,通过节点间的URL对link lists进行哈希分区),以相同的方式对ranks进行分区,这样就确保links和ranks之间的join操作不需要通信(因为每个URL的排名将与其link list在同一台机器上)。我们还可以编写自定义分区程序类来对相互链接的页面进行分组(例如,按域名对URL进行分区)。在定义links时,可以通过调用partitionBy来表示这两种优化:

1
2
links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()

在此初始化之后,links和ranks之间的join操作会自动地将每个URL的贡献值聚集到它link lists所在的机器上,在这台机器上计算新的排名并与它的links进行join操作。这种跨迭代的一致分区是Pregel等专用框架中的主要优化之一。 RDDs让用户直接表达这一目标。

4 表示RDDs

提供RDD作为抽象的挑战之一是为它们选择一种能够跟踪各种转换操作的lineage的表示。理想情况下,实现RDD的系统应该提供尽可能丰富的一组转换操作(例如,表2中的转换操作),并让用户以任意方式组合它们。我们为RDD提出了一个简单的基于图的表示,以达到这些目标。我们在Spark中使用这种表示来支持各种转换,而无需为调度器添加针对每个转换的特殊逻辑,这大大简化了系统设计。简而言之,我们建议通过一个公共接口来表示每个RDD,这个接口包含五条信息:一组分区,它们是数据集的原子部分;父RDDs(parent RDDs)的一组依赖关系;基于其父RDDs计算数据集的函数;有关其分区方案和数据放置的元数据。例如,表示HDFS文件的RDD具有文件的每个块的分区,并且知道每个块所在的机器。同时,对于这个RDD进行map操作的结果具有相同的分区,但是在计算其元素时将在父数据上应用map方法。我们在表3中总结了这个接口。

设计此接口时最有趣的问题是如何表示RDDs之间的依赖关系。我们发现将依赖关系分为两类是足够且有用的:窄依赖关系(narrow dependencies),其中父RDD的每个分区最多由子RDD的一个分区使用;宽依赖关系(wide dependencies),其中多个子RDD分区可能依赖一个父DD分区。例如,map导致窄依赖关系,而join导致宽依赖关系(除非父RDD是哈希分区的)。图4显示了其他示例。

这样区分有两个原因。首先,窄依赖关系允许在一个集群节点上进行流水线执行,这可以计算所有父分区。例如,可以逐个元素地应用map,然后应用filter操作。相比之下,宽依赖关系要求所有父分区的数据都已经计算完成,并使用类似MapReduce的操作在节点之间进行shuffle。其次,节点故障后的恢复在窄依赖时更有效,因为只需要重新计算丢失的对应的父分区,并且可以在不同节点上并行地重新计算它们。相反,在具有宽依赖的lineage图中,单个故障节点可能导致RDD的所有祖先丢失某些分区,从而需要完全重新执行计算。

RDDs的这个通用接口使得可以在少于20行代码中实现Spark中的大多数转换操作。实际上,即使是新的Spark用户也已经实现了新的转换(例如,采样和各种类型的join),而不用知道调度器的细节。我们在下面阐述一些RDD实现。

HDFS文件:
我们样本中的输入RDDs是HDFS中的文件。对于这些RDDs,partitions为文件的每个块返回一个分区(块的偏移量存储在每个Partition对象中),preferredLocations给出块所在的节点,iterator读取块。

map:
在任何RDD上调用map都会返回MappedRDD对象。该操作传递一个函数参数给map,对父RDD上的记录按照iterator的方式执行这个函数,并返回一组符合条件的父RDD分区及其位置。

union:
在两个RDD上执行union操作,返回两个父RDD分区的并集。通过相应父RDD上的窄依赖关系计算每个子RDD分区【7注意union操作不会过滤重复值】。

join:
对两个RDD执行join操作可能产生窄依赖(如果这两个RDD拥有相同的哈希分区或范围分区),可能是宽依赖,也可能两种依赖都有(比如一个父RDD有分区,而另一父RDD没有)。在任何一种情况下,输出RDD都有一个分区程序(从父项继承的分区程序或默认的散列分区程序)。

5 实现

我们大约用14000行scala代码实现了Spark。该系统运行在Mesos集群管理器[17]上,允许它与Hadoop,MPI和其他应用程序共享资源。每个Spark程序作为单独的Mesos应用程序运行,具有自己的驱动程序(master)和工作程序(workers),这些应用程序之间的资源共享由Mesos管理的。Spark可以使用Hadoop现有的输入插件API从任何Hadoop输入源(例如,HDFS或HBase)读取数据,并在未经修改的Scala版本上运行。

我们现在简要介绍系统中几个技术上有趣的部分:我们的作业调度程序(第5.1节),允许交互式使用的Spark解释器(第5.2节),内存管理(第5.3节)和支持检查点(第5.4节)。

5.1 作业调度

Spark的调度程序使用我们在第4节所述的RDD表示。

总的来说,我们的调度程序类似于Dryad的[19],但是另外考虑了RDDs在内存中持久化的分区。每当用户在RDD上运行动作(例如,count或save)时,该调度程序就会检查该RDD的lineage图以构建要执行的stage的DAG,如图5所示。每个阶段包含尽可能多的具有窄依赖的流水线转换。阶段的边界是宽依赖所需的shuffle操作,或任何已经计算过的分区,它们可以跳过父RDD的计算。然后,调度程序启动任务以计算每个阶段中丢失的分区,直到计算出目标RDD为止。

我们的调度程序基于数据位置使用延迟调度将任务分配给机器[32]。如果任务需要处理节点内存中可用的分区,我们会将其发送到该节点。否则,如果任务处理包含RDD提供优选位置的分区(例如,HDFS文件),我们将其发送给那些分区。

对于宽依赖关系(即,shuffle依赖关系),我们目前在包含父分区的节点上物化中间记录以简化故障恢复,就像MapReduce物化map输出一样。

如果任务失败,只要其阶段的父项仍然可用,我们就会在另一个节点上重新运行它。如果某些阶段变得不可用(例如,因为来自shuffle的“map side”的输出丢失),我们重新提交任务以并行计算丢失的分区。我们还不能解决调度程序的失败,尽管复制RDD的lineage图会更简单。

最后,尽管Spark中的所有计算当前都是为响应驱动程序中调用的操作而运行的,但我们也在尝试让集群上的任务(例如,映射)调用lookup操作,其提供根据键值对哈希分区的RDDs中的元素进行随机获取。在这种情况下,任务需要告诉调度程序在缺少时计算所需的分区。

5.2 解释器整合

Scala包含一个类似于Ruby和Python的交互式shell。鉴于内存数据的延迟较低,我们希望让用户从解释器主动运行Spark来查询大数据集。

Scala解释器通常通过为用户键入的每一行编译一个类,将其加载到JVM中,并在其上调用函数来操作。该类包含一个单例对象,该对象包含该行上的变量或函数,并在初始化方法中运行行代码。例如,如果用户输入代码var x = 5,接着又输入println(x),则解释器会定义一个包含x的Line1类,并将第2行编译为println(Line1.getInstance().x)。

在Spark中我们对解释器做了两点改动:

1.类传输:解释器能够支持基于HTTP传输类字节码,这样worker节点就能获取输入每行代码对应的类的字节码。

2.改进的代码生成逻辑:通常每行上创建的单例对象通过对应类上的静态方法进行访问。也就是说,如果要序列化一个闭包,它引用了前面代码行中变量,比如上面的例子Line1.x,Java不会根据对象关系传输包含x的Line1实例。所以worker节点不会收到x。我们将这种代码生成逻辑改为直接引用各个行对象的实例。

图6显示了在我们更改之后,解释器如何将用户键入的一组行转换为Java对象。

Spark解释器便于跟踪处理大量对象关系引用,并且便利了HDFS数据集的研究。我们计划以Spark解释器为基础,开发提供高级数据分析语言支持的交互式工具,比如SQL。

5.3 内存管理

Spark提供了三种持久化RDD的选项:反序列化Java对象的内存存储,序列化数据的内存存储和硬盘存储。第一个选项提供最快的性能,因为Java VM可以原生访问每个RDD元素。第二个选项允许用户在空间有限时选择比Java对象图更高效的内存表示,但代价是性能较低【成本取决于应用程序每个字节数据的计算量,但轻量级处理的最大值可提升2倍】。第三个选项对于太大而无法保留在RAM中但在每次使用时重新计算成本高昂的RDD非常有用

为了管理可用的有限内存,我们在RDDs层面上使用LRU替换策略。当计算新的RDD分区但没有足够的空间来存储它时,我们从最近最少的RDD中替换出一个分区,除它这与具有新分区的RDD相同。在这种情况下,我们将旧分区保留在内存中,以防止来自同一RDD的分区循环进出。这很重要,因为大多数操作都会在整个RDD上运行任务,因此很可能将来需要已经在内存中的分区。到目前为止,我们发现此默认策略在所有应用程序中都能正常运行,但我们还通过每个RDD的“持久化优先级”为用户提供进一步控制。

最后,群集上的每个Spark实例当前都有自己独立的内存空间。在未来的工作中,我们计划通过统一的内存管理器来研究跨Spark实例共享RDDs。

5.4 对检查点的支持

尽管在故障之后可以始终使用lineage来恢复RDDs,但对于具有长lineage的RDDs来说,这种恢复可能是耗时的。因此,将一些RDDs checkpoint到稳定存储可能会有所帮助。

通常,检查点技术对于包含宽依赖关系的长lineage图的RDDs很有用,例如我们的PageRank示例(第3.2.2节)中的rank数据集。在这些情况下,集群中的节点故障可能导致每个父RDD丢失一些数据片段,从而需要完全重新计算[20]。相反,对于对稳定存储中的数据具有窄依赖的RDDs,例如我们的逻辑回归示例(第3.2.1节)中的points和PageRank中的link lists,检查点技术可能没多大用。如果节点发生故障,则可以在其他节点上并行重新计算从这些RDD中丢失的分区,而这只是复制整个RDD的成本的一小部分。

Spark目前提供了一个用于检查点技术的API(一个在persist中的REPLICATE标志),但是由用户决定哪些数据使用检查点。但是,我们还在研究如何进行自动检查。因为我们的调度程序知道每个数据集的大小以及首次计算它所花费的时间,所以它应该能够选择一组最佳RDDs来检查点以最小化系统恢复时间[30]。

最后,要强调的一点是,RDDs的只读特性使它们比通用共享存储更容易进行检查。由于一致性问题不需要考虑,因此可以在后台写出RDD,而无需程序暂停或采用分布式快照方案。

6 评估

我们通过Amazon EC2上的一系列实验以及用户应用程序的基准评估了Spark和RDDs。总的来说,我们的结果显示如下:

  • 在迭代机器学习和图形应用程序中,Spark的性能比Hadoop高出20倍。加速来自于通过将数据作为Java对象存储在内存中来避免I / O和反序列化成本。
  • 我们的用户编写的应用程序可以很好地执行和扩展。特别是,我们使用Spark分析报表比在Hadoop上运行快40倍。
  • 当节点发生故障时,Spark可以通过仅重建丢失的RDD分区来快速恢复。
  • Spark可用于交互查询1 TB数据集,延迟仅为5-7秒。

我们首先与Hadoop进行迭代机器学习(第6.1节)和PageRank(第6.2节)的基准比较。然后,我们评估Spark中的故障恢复(第6.3节)以及数据集不适合存储时的行为(第6.4节)。最后,我们讨论了用户应用(第6.5节)和交互式数据挖掘(第6.6节)的结果。

除非另有说明,否则我们的测试使用m1.xlarge EC2节点,其中包含4个内核和15 GB RAM。我们使用HDFS进行存储,具有256 MB块。在每次测试之前,我们清除了OS缓冲区高速缓存,以准确测量IO成本。

6.1 迭代式机器学习应用

我们实现了两个迭代机器学习应用程序,逻辑回归和k-means,以比较以下系统的性能:

  • Hadoop:The Hadoop 0.20.2 stable release。
  • HadoopBinMem:在首轮迭代中执行预处理,通过将输入数据转换成为开销较低的二进制格式来减少后续迭代过程中文本解析的开销,在HDFS中加载到内存。
  • Spark:基于RDDs的实现。

我们使用25-100台机器在100 GB数据集上对这两个算法进行了10次迭代。两个应用程序之间的关键区别是它们每个数据字节执行的计算量。k-means的迭代时间由计算决定,而逻辑回归的计算密集度较低,但对反序列化和I / O花费的时间更敏感。

由于典型的学习算法需要数十次迭代才能收敛,因此我们分别报告第一次迭代和后续迭代的时间。我们发现通过RDDs共享数据可以大大加快未来的迭代速度。

首次迭代
所有三个系统在第一次迭代中从HDFS读取文本输入。如图7中的浅色柱状图所示,Spark在实验中比Hadoop要快一些。如图7中的灯条所示,Spark在实验中比Hadoop要快一些。HadoopBinMem是最慢的,因为它通过一个额外的MapReduce作业将数据转换成二进制格式,并必须通过网络在HDFS结点间复制数据。

后续迭代
图7还显示了后续迭代的平均运行时间,而图8显示了其随集群大小变化而产生的时间变化。对于逻辑回归,Spark在100台机器上分别比Hadoop和HadoopBinMem快25.3倍和20.7倍。对于更加计算密集型的k-means应用程序来说,Spark仍然实现了1.9倍到3.2倍的加速。

理解速度提升
我们惊讶地发现Spark甚至比内存存储二进制数据的Hadoop(HadoopBinMem)还要快20倍。在HadoopBinMem中,我们使用了Hadoop的标准二进制格式(SequenceFile)和256 MB大小的超大块,并且我们强制HDFS的数据目录位于内存文件系统中。但是,由于以下几个因素,Hadoop仍然运行缓慢:

1.Hadoop软件堆栈的最小开销,

2.提供数据时HDFS的开销,

3.将二进制记录转换为可用的内存中Java对象的反序列化成本。

我们依次研究了这些因素。为了估测1,我们运行空的Hadoop作业,仅仅执行作业的初始化、启动任务、清理工作就至少耗时25秒。对于2,我们发现为了服务每一个HDFS数据块,HDFS进行了多次复制以及计算校验和操作。

最后,为了估测3,我们在一台机器上运行微基准测试,以256 MB输入的各种格式运行逻辑回归计算。特别是,我们比较了处理来自HDFS(HDFS堆栈中的开销将给出)和内存本地文件(内核可以非常有效地将数据传递给程序)的文本和二进制输入的时间。

结果如图9所示。内存中HDFS和本地文件之间的差异表明,通过HDFS读取产生了2秒的开销,即使数据存储在本地机器上也是如此。文本和二进制输入之间的差异表明解析开销为7秒。最后,即使从内存文件中读取,将预解析的二进制数据转换为Java对象也需要3秒钟,这仍然几乎与逻辑回归本身一样开销高昂。通过将RDD元素直接存储为内存中的Java对象,Spark可以避免所有这些开销。

6.2 PageRank

我们使用54 GB Wikipedia导出数据比较了Spark与Hadoop 进行PageRank的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据。图10展示了单独的内存存储使Spark在30个节点上的速度比Hadoop提高了2.4倍。此外,如第3.2.2节所述,控制RDD的分区以使其在迭代中保持一致,将提高到7.4倍。加速也能几乎线性地扩展到60个节点上。

我们还评估了使用Spark实现Pregel版本的PageRank的性能,结果将在7.1节展示。迭代时间与图10中的相似,但是更长约4秒,因为Pregel在每次迭代时运行一个额外的操作,让顶点“投票”是否完成工作。

6.3 错误恢复

我们评估了k-means应用程序中节点故障后使用lineage重建RDD分区的开销。图11比较了正常操作场景中75节点集群上k-means的10次运行的运行时间,其中一个节点在第6次迭代开始时失败。另一个没有任何故障,每次迭代包含400个任务,处理100 GB的数据。

直到第5次迭代结束,迭代时间约为58秒。在第6次迭代中,其中一台机器被杀死,导致在该机器上运行的任务和存储在那里的RDD分区丢失。Spark在其他机器上并行重新执行这些任务,他们通过lineage重新读取相应的输入数据和重建的RDD,导致操作时间增加到80秒。一旦重建丢失的RDD分区,迭代时间就会回落到58秒。

请注意,使用基于检查点的故障恢复机制,恢复可能需要重新运行至少几次迭代,具体取决于检查点的频率。此外,系统需要通过网络复制应用程序的100 GB工作集(文本输入数据转换为二进制),并且要么消耗两倍于Spark的内存以将其复制到RAM中,要么必须等待写入100 GB到磁盘。相比之下,我们示例中RDD的lineage图的大小都小于10 KB。

6.4 内存不足时表现

到现在为止,我们能保证集群中的每个节点都有足够的内存去缓存迭代过程中使用的RDDs。一个自然的问题是,如果没有足够的内存来存储作业的数据,Spark是如何运行的。在本实验中,我们将Spark配置为不使用超过一定百分比的内存来在每台机器上存储RDD。图12中,我们为的逻辑回归提供了各种存储空间的配置。我们发现性能在控件降低时缓慢降低。

6.5 用Spark构建的用户应用程序

内存分析
Conviva Inc是一家视频发行公司,它使用Spark加速了之前在Hadoop上运行的大量数据分析报告。例如,一份报告作为一系列Hive [1]查询运行,这些查询计算出了客户的各种统计信息。这些查询都在相同的数据子集上工作(与客户提供的过滤器匹配的记录),但在不同的分组字段上执行了聚合(平均值,百分位数和COUNT DISTINCT),需要使用单独的MapReduce作业。通过在Spark中实现查询并将一次共享的数据子集加载到RDD中,该公司能够将报告速度提高40倍。在Hadoop集群上花费20小时的200 GB压缩数据的报告现在仅使用两台Spark计算机就能在30分钟内运行完成。此外,Spark程序只需要96 GB的RAM,因为它只存储与RDD中客户的过滤器匹配的行和列,而不是整个解压缩文件。

城市交通模型
在Berkeley的Mobile Millennium项目[18]中,基于一系列分散的汽车GPS监测数据,研究人员使用并行化机器学习算法来推算公路交通拥堵状况。数据来自市区10000个互联的公路线路网,还有600000个由汽车GPS装置采集到的样本数据,这些数据记录了汽车在两个地点之间行驶的时间(每一条路线的行驶时间可能跨多个公路线路网)。使用一个交通模型,通过推算跨多个公路网行驶耗时预期,系统能够估算拥堵状况。研究人员使用Spark实现了一个可迭代的EM算法,其中包括向Worker节点广播路线网络信息,在E和M阶段之间执行reduceByKey操作,应用从20个节点扩展到80个节点(每个节点4核),如图13(a)所示:

推特垃圾邮件分类
Berkeley的Monarch项目[29]使用Spark识别Twitter消息上的垃圾链接。他们在Spark上实现了一个类似6.1小节中示例的Logistic回归分类器,不同的是使用分布式的reduceByKey操作并行对梯度向量求和。图13(b)显示了基于50G数据子集训练训练分类器的结果,整个数据集是250000的URL、至少10^7个与网络相关的特征/维度,内容、词性与访问一个URL的页面相关。随着节点的增加,这并不像交通应用程序那样近似线性,主要是因为每轮迭代的固定通信代价较高。

6.6 交互式数据挖掘

为了演示Spark交互式查询大数据集的能力,我们用它来分析1TB的维基百科页面浏览日志(2年的数据)。在本次实验中,我们使用了100 m2.4xlarge EC2实例,每个实例有8个内核和68 GB内存。在整个输入数据集上简单地查询如下内容以获取页面浏览总数:(1)全部页面;(2)页面的标题能精确匹配给定的关键词;(3)页面的标题能部分匹配给定的关键词。

图14显示了分别在整个、1/2、1/10的数据上查询的响应时间,甚至1TB数据在Spark上查询仅耗时5-7秒,这比直接操作磁盘数据快几个数量级。例如,从磁盘上查询1TB数据耗时170秒,这表明了RDD缓存使得Spark成为一个交互式数据挖掘的强大工具。

7 讨论

虽然RDD由于其不可变性和粗粒度转换似乎提供有限的编程接口,但我们发现它们适用于广泛的应用。特别地,RDDs可以表达数量惊人的集群编程模型,这些模型迄今已被提议作为单独的框架,允许用户在一个程序中组合这些模型(例如,运行MapReduce操作来构建图形,然后在其上运行Pregel)并在它们之间共享数据。在本节中,我们将讨论RDDs可以表达哪些编程模型以及它们如此广泛适用的原因(第7.1节)。此外,我们讨论了我们正在探索的RDDs中lineage信息的另一个好处,能够促进这些模型的调试(第7.2节)。

7.1 表达现有的编程模型

RDDs可以有效地表达迄今为止独立提出的许多集群编程模型。“有效”,意味着RDDs不仅可以用于生成与这些模型中编写的程序相同的输出,而且RDDs还可以实现这些框架具有的优化,例如将特定数据保存在内存中,将其分区为最大限度地减少通信,并有效地从故障中恢复。使用RDD表达的模型包括:

MapReduce: 可以使用Spark中的flatMap和groupByKey操作表示此模型,如果存在组合器,则可以使用reduceByKey表示.

DryadLINQ:
与更普遍的Dryad运行时相比,DryadLINQ系统提供了比MapReduce更广泛的运算符,但这些都是直接对应于Spark(map,groupByKey,join等)中可用的RDD转换的批量运算符。

SQL:
与DryadLINQ表达式一样,SQL查询对记录集执行数据并行操作。

Pregel:
Google的Pregel [22]是迭代图形应用程序的专用模型,起初与其他系统中的面向集合的编程模型完全不同。在Pregel中,程序作为一系列协调的“supersteps”运行。在每个supersteps中,图中的每个顶点都运行一个用户函数,可以更新与顶点相关的状态,更改图形拓扑,并将消息发送到其他顶点用于下一个superstep。该模型可以表达许多图算法,包括最短路径,二分匹配和PageRank。

让我们用RDDs实现这个模型的关键点是Pregel将相同的用户函数应用于每次迭代的所有顶点。因此,我们可以将每次迭代的顶点状态存储在RDD中,并执行批量转换(flatMap)以应用此函数并生成消息的RDD。之后可以将该RDD与顶点状态连接操作以表示消息的转换。同样重要的是,RDDs允许我们像Pregel那样将顶点状态保存在内存中,通过控制其分区来最小化通信,并支持故障时的部分恢复。我们在Spark上实现了Pregel,其作为200行库,并向读者推荐[33]以获取更多详细信息。

迭代式MapReduce: 最近提出的几个系统,包括HaLoop [7]和Twister [11],提供了迭代的MapReduce模型,用户可以为系统提供一系列MapReduce作业循环执行。系统使数据在迭代中保持一致,Twister也可以将其保存在内存中。两种优化都很容易用RDDs表达,我们能够使用Spark将HaLoop实现为200行库。

流式批处理: 研究人员最近提出了几种增量处理系统,用于定期用新数据更新结果的应用[21,15,4]。例如,每15分钟更新一次广告点击统计数据的应用程序应该能够将前一个15分钟窗口的中间状态与新日志中的数据相结合。这些系统执行类似于Dryad的批量操作,但将应用程序状态存储在分布式文件系统中。将中间状态置于RDDs中将加速其处理。

解释RDDs的表达性能
为什么RDD能够表达这些不同的编程模型?原因是对RDD的限制对许多并行应用程序几乎没有影响。特别是,尽管RDDs只能通过批量转换创建,但许多并行程序本质上就是将相同的操作应用于许多记录,所以使其易于用RDDs表达。类似地,RDD的不变性不是一个障碍,因为可以创建多个RDD来表示同一数据集的不同版本。实际上,今天的许多MapReduce应用程序都运行在不允许更新文件的文件系统上,例如HDFS。

最后一个问题是为什么以前的框架没有提供相同的一般性。我们认为这是因为这些系统探索了MapReduce和Dryad无法很好处理的特定问题,例如迭代,但是没有观察到这些问题的常见原因是缺乏数据共享抽象。

7.2 利用RDDs进行调试

虽然我们最初设计的RDD在确定性方面可以重新计算以实现容错,但这个属性也可以简化调试。特别是,通过记录在作业期间创建的RDD的lineage,可以(1)稍后重建这些RDD并让用户以交互方式查询它们,以及(2)在单个过程调试中重新执行作业的任何任务,通过重新计算它所依赖的RDD分区。与通用分布式系统[13]的传统重放调试器不同,不必捕获或推断跨多节点的时间顺序,这种方法几乎不增加记录开销,因为只需要记录RDD lineage图【9 与这些系统不同,基于RDD的调试器不会重放用户函数中的非确定性行为(例如,非确定性映射),但它至少可以通过校验和数据来报告它】。我们目前正在开发基于这些想法的Spark调试器[33]。

8 相关工作

集群编程模型:
集群编程模型的相关工作分为几类。首先,MapReduce [10],Dryad [19]和Ciel [23]等数据流模型支持丰富的运算符集,用于处理数据,但通过稳定的存储系统共享数据。RDDs代表比稳定存储更有效的数据共享抽象,因为它们避免了数据复制,I / O和序列化的成本【10 请注意,在像RAMCloud [25]这样的内存数据存储中运行MapReduce / Dryad仍然需要数据复制和序列化,这对于某些应用程序来说可能代价很高,如6.1节所示】。

其次,数据流系统的几个高级编程接口,包括DryadLINQ [31]和FlumeJava [8],提供了语言集成的API,用户通过map和join等操作符操作“并行集合”。但是,在这些系统中,并行集合表示磁盘上的文件或用于表示查询计划的临时数据集。虽然系统会在相同的操作符查询间流水式地处理数据(如,一个map操作接着一个map操作),但是它们并不能在各个查询之间有效地共享数据。我们在并行收集模型上基于Spark的API,因为它的方便性,并没有增加语言集成接口的新颖性,但通过提供RDD作为此接口背后的存储抽象,我们允许它支持更广泛的应用程序。

第三类系统为需要数据共享的特定类别的应用程序提供高级接口。例如,Pregel [22]支持迭代图应用,而Twister [11]和HaLoop [7]是迭代MapReduce运行时。但是,这些框架隐式地为他们支持的计算模式提供数据共享,并且不提供一般抽象来供用户选择。例如,用户不能使用Pregel或Twister将数据集加载到内存中,然后决定在其上运行哪个查询,RDD明确地提供分布式存储抽象,因此可以支持这些专用系统不支持的应用,例如交互式数据挖掘。

最后,一些系统暴露共享可变状态以允许用户执行内存计算。例如,Piccolo [27]允许用户运行并行功能以读取和更新分布式哈希表中的单元格。分布式共享存储(DSM)系统[24]和键值存储如RAMCloud [25]提供一个相似的模型。RDD在两个方面与这些系统不同。首先,RDD基于运算符(如map,sort和join)提供更高级别的编程接口,而Piccolo和DSM中的接口只是对表格单元格的读取和更新。其次,Piccolo和DSM系统通过检查点和回滚实现恢复,这比许多应用程序中基于lineage策略的RDDs更昂贵。最后,正如第2.3节所讨论的那样,RDD还提供了其他优于DSM的优势,例如straggler缓解。

缓存系统:
Nectar [12]可以通过程序分析识别常见的子表达式,在DryadLINQ作业中重用中间结果[16]。这种能力对于添加到基于RDD的系统非常有吸引力。但是,Nectar不提供内存中缓存(它将数据放在分布式文件系统中),也不允许用户明确控制要持久化的数据集以及如何对它们进行分区。Ciel [23]和FlumeJava [8]同样可以缓存任务结果,但不提供内存缓存或显式控制缓存哪些数据。Ananthanarayanan等。建议在分布式文件系统中添加内存缓存,以利用数据访问的时间和空间局部性[3]。虽然此解决方案可以更快地访问文件系统中已有的数据,但它不像在RDDs中那样有效地在一个应用中共享中间结果,因为它仍然需要应用程序在不同阶段间将这些结果写入文件系统。

Lineage:
记载数据的lineage或起源信息长期以来一直是科学计算和数据库中的研究课题,应用于解释结果,允许数据可以被别的数据重建,同时如果在工作流中出现了bug或者数据及丢失了,能够重新计算得到数据。我们推荐读者阅读[5]和[9]来了解这些工作。RDDs提供并行编程模型,其中获取细粒度的lineage成本低廉,因此可用于故障恢复。

我们基于lineage的恢复机制也类似于MapReduce和Dryad中计算(作业)中使用的恢复机制,它跟踪任务的DAG之间的依赖关系。但是,在这些系统中,谱系(lineage)信息在作业结束后丢失,需要使用复制的存储系统来跨计算共享数据。相比之下,RDDs应用lineage来有效地跨计算保留内存数据,而无需复制和磁盘I / O的成本。

关系型数据库:
RDDs在概念上类似于数据库中的视图,而持久化RDDs类似于物化视图[28]。但是,与DSM系统一样,数据库通常允许对所有记录进行细粒度的读写访问,需要记录操作和数据以实现容错,并且需要额外的开销来维护一致性。RDD的粗粒度转换模型不需要这些开销。

9 结论

我们提供了弹性分布式数据集(RDDs),这是一种高效,通用和容错的用于在集群应用程序中共享数据的抽象。RDDs可以表达各种并行应用程序,包括已经提出用于迭代计算的许多专用编程模型,以及这些模型还未实现的新应用程序。与现有的集群存储抽象(需要数据复制以实现容错)不同,RDDs提供基于粗粒度转换的API,使其能够使用lineage来有效地恢复数据。我们在Spark中实现了RDDs,它在迭代应用程序中的性能比Hadoop高出20倍,并且可以交互式查询数百GB的数据。

我们在spark-project.org上提供了开源Spark作为可扩展数据分析和系统研究的工具。

致谢

我们感谢第一批Spark用户,包括Tim Hunter,Lester Mackey,Dilip Joseph和Jibin Zhan,他们在实际应用中尝试我们的系统,提供了许多好的建议,并指出了一些研究中的挑战。我们还要感谢我们的指导者Ed Nightingale以及审核的反馈。这项研究部分由Berkeley AMP Lab
支持,由 Google, SAP, Amazon Web Services, Cloudera, Huawei, IBM, Intel, Microsoft, NEC, NetApp 和 VMWare,DARPA,the Natural Sci- ences 和 Engineering Research Council of Canada赞助。

引用

[1] Apache Hive. http://hadoop.apache.org/hive.

[2] Scala. http://www.scala-lang.org.

[3] G.Ananthanarayanan,A.Ghodsi,S.Shenker,andI.Stoica.
Disk-locality in datacenter computing considered irrelevant. In
HotOS ’11, 2011.

[4] P.Bhatotia,A.Wieder,R.Rodrigues,U.A.Acar,and
R. Pasquin. Incoop: MapReduce for incremental computations.
In ACM SOCC ’11, 2011.

[5] R.BoseandJ.Frew.Lineageretrievalforscientificdata
processing: a survey. ACM Computing Surveys, 37:1–28, 2005.

[6] S.BrinandL.Page.Theanatomyofalarge-scalehypertextual
web search engine. In WWW, 1998.

[7] Y.Bu,B.Howe,M.Balazinska,andM.D.Ernst.HaLoop:
efficient iterative data processing on large clusters. Proc. VLDB
Endow., 3:285–296, September 2010.

[8] C.Chambers,A.Raniwala,F.Perry,S.Adams,R.R.Henry,
R. Bradshaw, and N. Weizenbaum. FlumeJava: easy, efficient
data-parallel pipelines. In PLDI ’10. ACM, 2010.

[9] J.Cheney,L.Chiticariu,andW.-C.Tan.Provenancein
databases: Why, how, and where. Foundations and Trends in
Databases, 1(4):379–474, 2009.

[10] J.DeanandS.Ghemawat.MapReduce:Simplifieddata
processing on large clusters. In OSDI, 2004.

[11] J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S.-H. Bae, J. Qiu, and G. Fox. Twister: a runtime for iterative mapreduce. In HPDC ’10, 2010.

[12] P.K.Gunda,L.Ravindranath,C.A.Thekkath,Y.Yu,and L. Zhuang. Nectar: automatic management of data and computation in datacenters. In OSDI ’10, 2010.

[13] Z.Guo,X.Wang,J.Tang,X.Liu,Z.Xu,M.Wu,M.F. Kaashoek, and Z. Zhang. R2: an application-level kernel for record and replay. OSDI’08, 2008.

[14] T.Hastie,R.Tibshirani,andJ.Friedman.TheElementsof Statistical Learning: Data Mining, Inference, and Prediction. Springer Publishing Company, New York, NY, 2009.

[15] B.He,M.Yang,Z.Guo,R.Chen,B.Su,W.Lin,andL.Zhou. Comet: batched stream processing for data intensive distributed computing. In SoCC ’10.

[16] A.Heydon,R.Levin,andY.Yu.Cachingfunctioncallsusing precise dependencies. In ACM SIGPLAN Notices, pages 311–320, 2000.

[17] B.Hindman,A.Konwinski,M.Zaharia,A.Ghodsi,A.D. Joseph, R. H. Katz, S. Shenker, and I. Stoica. Mesos: A platform for fine-grained resource sharing in the data center. In NSDI ’11.

[18] T.Hunter,T.Moldovan,M.Zaharia,S.Merzgui,J.Ma,M.J. Franklin, P. Abbeel, and A. M. Bayen. Scaling the Mobile Millennium system in the cloud. In SOCC ’11, 2011.

[19] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In EuroSys ’07, 2007.

[20] S.Y.Ko,I.Hoque,B.Cho,andI.Gupta.Onavailabilityof intermediate data in cloud computations. In HotOS ’09, 2009.

[21] D. Logothetis, C. Olston, B. Reed, K. C. Webb, and K. Yocum. Stateful bulk processing for incremental analytics. SoCC ’10. [22] G.Malewicz,M.H.Austern,A.J.Bik,J.C.Dehnert,I.Horn,
N. Leiser, and G. Czajkowski. Pregel: a system for large-scale
graph processing. In SIGMOD, 2010.

[23] D.G.Murray,M.Schwarzkopf,C.Smowton,S.Smith,
A. Madhavapeddy, and S. Hand. Ciel: a universal execution
engine for distributed data-flow computing. In NSDI, 2011.

[24] B.NitzbergandV.Lo.Distributedsharedmemory:asurveyof
issues and algorithms. Computer, 24(8):52 –60, Aug 1991.

[25] J.Ousterhout,P.Agrawal,D.Erickson,C.Kozyrakis,
J. Leverich, D. Mazie`res, S. Mitra, A. Narayanan, G. Parulkar, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for RAMClouds: scalable high-performance storage entirely in DRAM. SIGOPS Op. Sys. Rev., 43:92–105, Jan 2010.

[26] D.PengandF.Dabek.Large-scaleincrementalprocessingusing distributed transactions and notifications. In OSDI 2010.

[27] R.PowerandJ.Li.Piccolo:Buildingfast,distributedprograms with partitioned tables. In Proc. OSDI 2010, 2010.

[28] R.RamakrishnanandJ.Gehrke.DatabaseManagement Systems. McGraw-Hill, Inc., 3 edition, 2003.

[29] K.Thomas,C.Grier,J.Ma,V.Paxson,andD.Song.Designand evaluation of a real-time URL spam filtering service. In IEEE Symposium on Security and Privacy, 2011.

[30] J.W.Young.Afirstorderapproximationtotheoptimum checkpoint interval. Commun. ACM, 17:530–531, Sept 1974.

[31] Y.Yu,M.Isard,D.Fetterly,M.Budiu,U ́.Erlingsson,P.K. Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In OSDI ’08, 2008.

[32] M.Zaharia,D.Borthakur,J.SenSarma,K.Elmeleegy,
S. Shenker, and I. Stoica. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys ’10, 2010.

[33] M.Zaharia,M.Chowdhury,T.Das,A.Dave,J.Ma,
M. McCauley, M. Franklin, S. Shenker, and I. Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. Technical Report UCB/EECS-2011-82, EECS Department, UC Berkeley, 2011.