Spark入门学习
文章旨在为初学Spark的伙伴提供导读和入门的帮助。
Spark简介
Apache Spark是一种快速、通用、可扩展的大数据分析引擎。它是不断壮大的大数据分析解决方案家族中备受关注的明星成员,为分布式数据集的处理提供了一个有效框架,并以高效的方式处理分布式数据集。Spark集批处理、实时流处理、交互式查询与图计算于一体,避免了多种运算场景下需要部署不同集群带来的资源浪费。Spark在过去的2014年中获得了极大关注,并得到广泛应用,Spark社区也成为大数据领域和Apache软件基金会最活跃的项目之一,其活跃度甚至远超曾经只能望其项背的Hadoop。
Spark的优点
第一个优点是速度。
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上;而基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG(数据库可用性组)执行引擎,可以通过基于内存来高效处理数据流。
第二个优点是易用性。
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,这意味着可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。
第三个优点是通用性。
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX),这些不同类型的处理都可以在同一个应用中无缝使用。
第四个优点是可融合性。
Spark可以非常方便地与其他的开源产品进行融合。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cas-sandra等,不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架。
Spark性能比Hadoop快的原因
传统Hadoop的数据抽取运算基于磁盘,中间结果也是存储在磁盘上。
Spark则使用内存代替了传统HDFS存储中间结果:第一代的Hadoop完全使用Hdfs存储中间结果,第二代的Hadoop加入了cache来保存中间结果。
Spark架构综述
组件介绍
Driver是用户编写的数据处理逻辑,这个逻辑中包含用户创建的SparkContext。
SparkContext是用户逻辑与Spark集群主要的交互接口,它会和ClusterManager交互,包括向它申请计算资源等。
Cluster Manager负责集群的资源管理和调度,现在支持Standalone、Apache Mesos(集群管理器)和Hadoop的YARN(Apache新MapReduce框架)。
Worker Node是集群中可以执行计算任务的节点。
Executor是在一个Worker Node上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的Executor,计算最终在计算节点的Executor中执行。
Task是被送到某个Executor上的计算单元。
执行过程
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:
1)用户程序创建SparkContext时,新创建的SparkContext实例会连接到Cluster Manager。Cluster Manager会根据用户提交时设置的CPU和内存等信息为本次提交分配计算资源,启动Executor。
2)Driver会将用户程序划分为不同的执行阶段,每个执行阶段由一组完全相同的Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task。
3)Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver。
4)Driver会根据收到的Task的运行状态来处理不同的状态更新。Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到Executor所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据。
5)Driver会不断地调用Task,将Task发送到Executor执行,在所有的Task都正确执行或者超过执行次数的限制仍然没有执行成功时停止。
Spark的核心RDD
RDD简介
RDD(Resilient Distributed Dataset,弹性分布式数据集):分布在集群节点上的数据集, 这些集合可以用来进行各种操作。
RDD是Spark操纵数据的一个高度抽象,即Spark所操作的数据集都是包装成RDD来进行操作的。RDD 是一个有容错机制且可以被并行操作的元素集合。
RDD的类型
RDD有2种类型:
1、并行集合(parallelized collections):
接收一个已经存在的 scala 集合,然后进行各种并行计算。
如:val distData=sc.parallelize(Array(1,2,3,4,5))
即创建了一个 Int 数组的 RDD 集合。
distData.collect()
2、 Hadoop 数据集:
是从 Hadoop 上的文件系统创建的 RDD。
可以支持 textfile,sequence file 以及其他任何的 Hadoop 输入格式。
如: val val tfile=sc.textFile(“hdfs://master:9000/tmp/file2.txt”)
一旦创建完成, tfile 可以被进行数据集操作:
val counts = tfile.flatMap(line => line.split(“ “)).map(word => (word,1)).reduceByKey(+)
counts.collect()
RDD支持的操作
RDD 支持2种操作:
1、转换(transformation):从现有的数据集创建一个新的数据集。
2、动作(actions):在数据集上运行计算后,返回一个值给驱动程序。
RDD提供的功能
RDD提供2种功能:
1、断点恢复功能:
当多次计算过程中有某些 RDD 被重复使用或者保持断点恢复功能,为了减少计算代价,可以调用 checkpoint()方法来标记 RDD 是需要被 checkpoint的 , 当下次使用该 RDD 时, 将直接使用该 RDD 的 CheckPointRdd,不再重新计算该 RDD 及其父 RDD。
2、在内存中持久化数据集:
Spark 最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当持久化一个 RDD, 每一个结点都将把它的计算分块结果保存在内存中, 并在对此数据集 (或者衍生出的数据集) 进行的其它动作中重用。 这将使得后续的动作(Actions)变得更加迅速 (通常快10倍) 。
缓存是用 Spark 构建迭代算法的关键。
Spark核心组件概述
Spark Streaming实现了实时流处理;
GraphX实现了图计算;
MLlib实现了很多机器学习算法;
Spark SQL实现了基于Spark的交互式查询。
Spark Streaming
Spark Streaming基于Spark Core实现了可扩展、高吞吐和容错的实时数据流处理。
现在支持的数据源有Kafka、Flume、Twitter、ZeroMQ、Ki-nesis、HDFS、S3和TCP socket。处理后的结果可以存储到HDFS、Database或者Dashboard中。
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照批处理尺寸(如1秒)分成一段一段的数据(Stream),每一段数据都转换成Spark中的RDD,然后将Spark Streaming中对DStream的转换操作变为针对Spark中对RDD的转换操作,将RDD经过操作变成中间结果保存在内存中。
MLlib
MLlib是Spark对常用的机器学习算法的实现库,同时含有相关的测试和数据生成器,包括分类、回归、聚类、协同过滤、降维(dimensionality re-duction)以及底层基本的优化元素。
现在,MLlib实现了许多常用的算法:
与分类和回归相关的算法包括SVM、逻辑回归、线性回归、朴素贝叶斯分类、决策树等;协同过滤实现了交替最小二乘法(Alternating Least Square,ALS);
聚类实现了K-means、高斯混合(Gaus-sian mixture)、Power Iteration Clustering(PIC)、Latent Dirichlet Allocation(LDA)和Streaming版本的K-means;
降维实现了Singular Value De-composition(SVD)和Principal ComponentAnalysis(PCA);频繁模式挖掘(frequent pat-tern mining)实现了FP-growth。
Spark SQL
Spark SQL支持的数据源:
数据源API通过Spark SQL提供了访问结构化数据的可插拔机制。这使数据源有了简便的途径进行数据转换并加入到Spark平台中。
由API提供的密集优化器集合意味着过滤和列修剪在很多情况下都会被运用于数据源。这些综合的优化极大地减少了需要处理的数据量,因此能够显著提高Spark的工作效率。
数据源API的另一个优点就是不管数据的来源如何,用户都能够通过Spark支持的所有语言来操作这些数据。
此外,Spark SQL可以使用单一接口访问不同数据源的数据。
总之,Spark SQL提供的这些功能进一步统一了大数据分析的解决方案。
GraphX
Spark GraphX是Spark提供的关于图和图并行计算的API,它集ETL、试探性分析和迭代式的图计算于一体,并且在不失灵活性、易用性和容错性的前提下获得了很好的性能。现在GraphX已经提供了很多的算法,新的算法也在不断加入,而且很多的算法都是由Spark的用户贡献的。
Spark支持的三种部署方式
目前Apache Spark支持三种分布式部署方式,分别是standalone、Spark on mesos和 Spark on YARN。
standalone模式。
即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。
Spark On Mesos模式。
这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序:
1)粗粒度模式。2)细粒度模式。
粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
Spark On YARN模式。
这是一种最有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划。
Spark应用实例
任何Spark程序的编写都是从SparkContext(或用java编写时的JavaSparkContext)开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数。
初始化后,我们可用SparkContext对象包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照以下代码:
val conf=new SparkConf()
.setAppName(“Test Spark App”)
.setMaster(“local[4]”)
val sc=new SparkContext(conf)
这段代码会创建一个4个线程的SparkContext对象,并将其相应的任务命名为Test Spark APP。我们也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc=new SparkContext(“local[4]”,”Test Spark App”)
WordCount
读取本地文件:
val file=sc.textFile(“file:/opt/file/file2.txt”)
val count=file.flatMap(line => line.split(“ “)).map(word => (word,1)).reduceByKey(+)//<=>(x,y)=>x+y (x相同则y相加)
count.count()//返回记录的数目
count.collect()//将RDD以Scala集合的形式返回
读取hdfs中的文件:
val tfile=sc.textFile(“hdfs://master:9000/tmp/file1.txt”)
val counts = tfile.flatMap(line => line.split(“ “)).map(word => (word,1)).reduceByKey(+)
counts.count()
counts.collect()
程序解读:
=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数(比如Scala或Python中用def关键字定义的函数)。
语法line=>line.size表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。
Estimating Pi
val count = sc.parallelize(1 to 10000).map{i =>
val x = Math.random()
val y = Math.random()
if (xx + yy < 1)
1
else
0}.reduce( + )
4.0 * count / 10000
此算法设计来源于数学经典求pi值之面积比方法。