订阅
  捐助
Apache Spark大数据分析入门(一)
 
作者:Rick Hightower   来源: DZone   火龙果软件  发布于 2015-11-27
  2712  次浏览      15
 

摘要:Apache Spark的出现让普通人也具备了大数据及实时数据分析能力。鉴于此,本文通过动手实战操作演示带领大家快速地入门学习Spark。本文是Apache Spark入门系列教程(共四部分)的第一部分。

ApacheSpark的出现让普通人也具备了大数据及实时数据分析能力。鉴于此,本文通过动手实战操作演示带领大家快速地入门学习Spark。本文是ApacheSpark入门系列教程(共四部分)的第一部分。

全文共包括四个部分:

  • 第一部分:Spark入门,介绍如何使用Shell及RDDs
  • 第二部分:介绍SparkSQL、Dataframes及如何结合Spark与Cassandra一起使用
  • 第三部分:介绍SparkMLlib和SparkStreaming
  • 第四部分:介绍SparkGraphx图计算

本篇讲解的便是第一部分

关于全部摘要和提纲部分,请登录我们的网站ApacheSparkQuickStartforreal-timedata-analytics进行访问。

在网站上你可以找到更多这方面的文章和教程,例如:JavaReactiveMicroserviceTraining,MicroservicesArchitecture|ConsulServiceDiscoveryandHealthForMicroservicesArchitectureTutorial。还有更多的其它内容,感兴趣的可以去查看。

Spark概述

ApacheSpark是一个正在快速成长的开源集群计算系统,正在快速的成长。ApacheSpark生态系统中的包和框架日益丰富,使得Spark能够进行高级数据分析。ApacheSpark的快速成功得益于它的强大功能和易于使用性。相比于传统的MapReduce大数据分析,Spark效率更高、运行时速度更快。ApacheSpark提供了内存中的分布式计算能力,具有Java、Scala、Python、R四种编程语言的API编程接口。Spark生态系统如下图所示:


Display-Edit

整个生态系统构建在Spark内核引擎之上,内核使得Spark具备快速的内存计算能力,也使得其API支持Java、Scala,、Python、R四种编程语言。Streaming具备实时流数据的处理能力。SparkSQL使得用户使用他们最擅长的语言查询结构化数据,DataFrame位于SparkSQL的核心,DataFrame将数据保存为行的集合,对应行中的各列都被命名,通过使用DataFrame,可以非常方便地查询、绘制和过滤数据。MLlib为Spark中的机器学习框架。Graphx为图计算框架,提供结构化数据的图计算能力。以上便是整个生态系统的概况。

ApacheSpark的发展历史

  • 最初由加州伯克利大学(UCBerkeley)AMPlab实验室开发并于2010年开源,目前已经成为阿帕奇软件基金会(ApacheSoftwareFoundation)的顶级项目。
  • 已经有12,500次代码提交,这些提交来自630个源码贡献者(参见ApacheSparkGithubrepo)
  • 大部分代码使用Scala语言编写。
  • ApacheSpark的Google兴趣搜索量(Googlesearchinterests)最近呈井喷式的增长,这表明其关注度之高(Google广告词工具显示:仅七月就有多达108,000次搜索,比Microservices的搜索量多十倍)

  • 部分Spark的源码贡献者(distributors)分别来自IBM、Oracle、DataStax、BlueData、Cloudera……
  • 构建在Spark上的应用包括:Qlik、Talen、Tresata、atscale、platfora……
  • 使用Spark的公司有:Verizon Verizon、NBC、Yahoo、Spotify……

大家对ApacheSpark如此感兴趣的原因是它使得普通的开发具备Hadoop的数据处理能力。较之于Hadoop,Spark的集群配置比Hadoop集群的配置更简单,运行速度更快且更容易编程。Spark使得大多数的开发人员具备了大数据和实时数据分析能力。鉴于此,鉴于此,本文通过动手实战操作演示带领大家快速地入门学习ApacheSpark。

下载Spark并河演示如何使用交互式Shell命令行

动手实验ApacheSpark的最好方式是使用交互式Shell命令行,Spark目前有PythonShell和ScalaShell两种交互式命令行。

可以从这里下载ApacheSpark,下载时选择最近预编译好的版本以便能够立即运行shell。

目前最新的ApacheSpark版本是1.5.0,发布时间是2015年9月9日。

tar -xvzf ~/spark-1.5.0-bin-hadoop2.4.tgz

运行PythonShell

cd spark-1.5.0-bin-hadoop2.4  ./bin/pyspark

在本节中不会使用PythonShell进行演示。

Scala交互式命令行由于运行在JVM上,能够使用java库。

运行ScalaShell

cd spark-1.5.0-bin-hadoop2.4  ./bin/spark-shell

执行完上述命令行,你可以看到下列输出:

ScalaShell欢迎信息

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
Type in expressions to have them evaluated.
Type :help for more information.
15/08/24 21:58:29 INFO SparkContext: Running Spark version 1.5.0

下面是一些简单的练习以便帮助使用shell。也许你现在不能理解我们做的是什么,但在后面我们会对此进行详细分析。在ScalaShell中,执行下列操作:

在Spark中使用README文件创建textFileRDD

val textFile = sc.textFile("README.md")

获取textFileRDD的第一个元素

textFile.first()
res3: String = # Apache Spark

对textFileRDD中的数据进行过滤操作,返回所有包含“Spark”关键字的行,操作完成后会返回一个新的RDD,操作完成后可以对返回的RDD的行进行计数

筛选出包括Spark关键字的RDD然后进行行计数

val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark.count()
res10: Long = 19

要找出RDDlinesWithSpark单词出现最多的行,可以使用下列操作。使用map方法,将RDD中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。

找出RDDtextFile中包含单词数最多的行

textFile.map(line => line.split(" ").size)
.reduce((a, b) => if (a > b) a else b)
res11: Int = 14

返回结果表明第14行单词数最多。

也可以引入其它java包,例如Math.max()方法,因为map和reduce方法接受scala函数字面量作为参数。

在scalashell中引入Java方法

import java.lang.Math
textFile.map(line => line.split(" ").size)
.reduce((a, b) => Math.max(a, b))
res12: Int = 14

我们可以很容易地将数据缓存到内存当中。

将RDDlinesWithSpark缓存,然后进行行计数

linesWithSpark.cache()
res13: linesWithSpark.type =
MapPartitionsRDD[8] at filter at <console>:23
linesWithSpark.count()
res15: Long = 19

上面简要地给大家演示的了如何使用Spark交互式命令行。

弹性分布式数据集(RDDs)

Spark在集群中可以并行地执行任务,并行度由Spark中的主要组件之一——RDD决定。弹性分布式数据集(Resilientdistributeddata,RDD)是一种数据表示方式,RDD中的数据被分区存储在集群中(碎片化的数据存储方式),正是由于数据的分区存储使得任务可以并行执行。分区数量越多,并行越高。下图给出了RDD的表示:


Display-Edit

想像每列均为一个分区(partition),你可以非常方便地将分区数据分配给集群中的各个节点。

为创建RDD,可以从外部存储中读取数据,例如从Cassandra、Amazon简单存储服务(AmazonSimpleStorageService)、HDFS或其它Hadoop支持的输入数据格式中读取。也可以通过读取文件、数组或JSON格式的数据来创建RDD。另一方面,如果对于应用来说,数据是本地化的,此时你仅需要使用parallelize方法便可以将Spark的特性作用于相应数据,并通过ApacheSpark集群对数据进行并行化分析。为验证这一点,我们使用ScalaSparkShell进行演示:

通过单词列表集合创建RDDthingsRDD

val thingsRDD = sc.parallelize(List("spoon", "fork", "plate", "cup", "bottle"))
thingsRDD: org.apache.spark.rdd.RDD[String] =
ParallelCollectionRDD[11] at parallelize at <console>:24

计算RDDthingsRDD中单的个数

thingsRDD.count()
res16: Long = 5
运行Spark时,需要创建SparkContext。使用SparkShell交互式命令行时,SparkContext会自动创建。当调用SparkContext对象的parallelize方法后,我们会得到一个经过分区的RDD,这些数据将被分发到集群的各个节点上。

使用RDD我们能够做什么?

对RDD,既可以进行数据转换,也可以对进行action操作。这意味着使用transformation可以改变数据格式、进行数据查询或数据过滤操作等,使用action操作,可以触发数据的改变、抽取数据、收集数据甚至进行计数。

例如,我们可以使用Spark中的文本文件README.md创建一个RDDtextFile,文件中包含了若干文本行,将该文本文件读入RDDtextFile时,其中的文本行数据将被分区以便能够分发到集群中并被并行化操作。

根据README.md文件创建RDDtextFile

val textFile = sc.textFile("README.md")

行计数

textFile.count()
res17: Long = 98

README.md 文件中有98行数据。

得到的结果如下图所示:


Display-Edit

然后,我们可以将所有包含Spark关键字的行筛选出来,完成操作后会生成一个新的RDDlinesWithSpark:

创建一个过滤后的RDDlinesWithSpark

val linesWithSpark = textFile.filter(line => line.contains("Spark"))

在前一幅图中,我们给出了textFileRDD的表示,下面的图为RDDlinesWithSpark的表示:


Display-Edit

值得注意的是,Spark还存在键值对RDD(PairRDD),这种RDD的数据格式为键/值对数据(key/valuepaireddata)。例如下表中的数据,它表示水果与颜色的对应关系:


Display-Edit

对表中的数据使用groupByKey()转换操作将得到下列结果:

groupByKey()转换操作

pairRDD.groupByKey()
Banana [Yellow]
Apple [Red, Green]
Kiwi [Green]
Figs [Black]

该转换操作只将键为Apple,值为Red和Green的数据进行了分组。这些是到目前为止给出的转换操作例子。

当得到一个经过过滤操作后的RDD,可以collect/materialize相应的数据并使其流向应用程序,这是action操作的例子。经过此操作后,RDD中所有数据将消失,但我们仍然可以在RDD的数据上进行某些操作,因为它们仍然在内存当中。

Collect或materializelinesWithSparkRDD中的数据

linesWithSpark.collect()

值得一提的是每次进行Sparkaction操作时,例如count()action操作,Spark将重新启动所有的转换操作,计算将运行到最后一个转换操作,然后count操作返回计算结果,这种运行方式速度会较慢。为解决该问题和提高程序运行速度,可以将RDD的数据缓存到内存当中,这种方式的话,当你反复运行action操作时,能够避免每次计算都从头开始,直接从缓存到内存中的RDD得到相应的结果。

缓存RDDlinesWithSpark

linesWithSpark.cache()

如果你想将RDDlinesWithSpark从缓存中清除,可以使用unpersist()方法。

将linesWithSpark从内存中删除

linesWithSpark.unpersist()

如果不手动删除的话,在内存空间紧张的情况下,Spark会采用最近最久未使用(leastrecentlyusedlogic,LRU)调度算法删除缓存在内存中最久的RDD。

下面总结一下Spark从开始到结果的运行过程:

  • 创建某种数据类型的RDD
  • 对RDD中的数据进行转换操作,例如过滤操作
  • 在需要重用的情况下,对转换后或过滤后的RDD进行缓存
  • 在RDD上进行action操作,例如提取数据、计数、存储数据到Cassandra等。

下面给出的是RDD的部分转换操作清单:

  • filter()
  • map()
  • sample()
  • union()
  • groupbykey()
  • sortbykey()
  • combineByKey()
  • subtractByKey()
  • mapValues()
  • Keys()
  • Values()

下面给出的是RDD的部分action操作清单:

  • collect()
  • count()
  • first()
  • countbykey()
  • saveAsTextFile()
  • reduce()
  • take(n)
  • countBykey()
  • collectAsMap()
  • lookup(key)

关于RDD所有的操作清单和描述,可以参考Sparkdocumentation

结束语

本文介绍了ApacheSpark,一个正在快速成长、开源的集群计算系统。我们给大家展示了部分能够进行高级数据分析的ApacheSpark库和框架。对ApacheSpark为什么会如此成功的原因进行了简要分析,具体表现为ApacheSpark的强大功能和易用性。给大家演示了ApacheSpark提供的内存、分布式计算环境,并演示了其易用性及易掌握性。

在本系列教程的第二部分,我们对Spark进行更深入的介绍。

 

   
2712 次浏览       15
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
信息架构建模(基于UML+EA)3-21[北京]
软件架构设计师 3-21[北京]
图数据库与知识图谱 3-25[北京]
业务架构设计 4-11[北京]
SysML和EA系统设计与建模 4-22[北京]
DoDAF规范、模型与实例 5-23[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...