Spark大数据分析实战.html.pdf

上传人:紫竹语嫣 文档编号:5514558 上传时间:2020-05-27 格式:PDF 页数:42 大小:3.36MB
返回 下载 相关 举报
Spark大数据分析实战.html.pdf_第1页
第1页 / 共42页
Spark大数据分析实战.html.pdf_第2页
第2页 / 共42页
Spark大数据分析实战.html.pdf_第3页
第3页 / 共42页
Spark大数据分析实战.html.pdf_第4页
第4页 / 共42页
Spark大数据分析实战.html.pdf_第5页
第5页 / 共42页
点击查看更多>>
资源描述

《Spark大数据分析实战.html.pdf》由会员分享,可在线阅读,更多相关《Spark大数据分析实战.html.pdf(42页珍藏版)》请在三一文库上搜索。

1、前言 为什么要写这本书 Spark大数据技术还在如火如荼地发展,Spark中国峰会的召开,各地meetup的火爆举行,开源软件Spark也因此水涨船高,很多公司已经将Spark大范围落地并且应用。Spark使用者的需求已经从最 初的部署安装、运行实例,到现在越来越需要通过Spark构建丰富的数据分析应用。写一本Spark实用案例类的技术书籍,是一个持续了很久的想法。由于工作较为紧张,最初只是将参与或学习过的Spark 相关案例进行总结,但是随着时间的推移,最终还是打算将其中通用的算法、系统架构以及应用场景抽象出来,并进行适当简化,也算是一种总结和分享。 Spark发源于美国加州大学伯克利分校A

2、MPLab的大数据分析平台,它立足于内存计算,从多迭代批量处理出发,兼顾数据仓库、流处理和图计算等多种计算范式,是大数据系统领域的全栈计算平 台。Spark当下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持,生态系统日益完善,技术也逐渐走向成熟。 现在越来越多的同行已经了解Spark,并且开始使用Spark,但是国内缺少一本Spark的实战案例类的书籍,很多Spark初学者和开发人员只能参考网络上零散的博客或文档,学习效率较慢。本书也正 是为了解决上述问题而着意编写。 本书希望带给读者一个系统化的视角,秉承大道至简的主导思想,介绍Spark的基本原理,如何在Spark上构建复杂数

3、据分析算法,以及Spark如何与其他开源系统进行结合构建数据分析应用,让读者 开启Spark技术应用之旅。 本书特色 Spark作为一款基于内存的分布式计算框架,具有简洁的接口,可以快速构建上层数据分析算法,同时具有很好的兼容性,能够结合其他开源数据分析系统构建数据分析应用或者产品。 为了适合读者阅读和掌握知识结构,本书从Spark基本概念和机制介绍入手,结合笔者实践经验讲解如何在Spark之上构建机器学习算法,并最后结合不同的应用场景构建数据分析应用。 读者对象 本书中一些实操和应用章节,比较适数据分析和开发人员,可以作为工作手边书;机器学习和算法方面的章节,比较适合机器学习和算法工程师,可

4、以分享经验,拓展解决问题的思路。 Spark初学者 Spark应用开发人员 Spark机器学习爱好者 开源软件爱好者 其他对大数据技术感兴趣的人员 如何阅读本书 本书分为11章内容。 第1章 从Spark概念出发,介绍Spark的来龙去脉,阐述Spark机制与如何进行Spark编程。 第2章 详细介绍Spark的开发环境配置。 第3章 详细介绍Spark生态系统重要组件Spark SQL、Spark Streaming、GraphX、MLlib的实现机制,为后续使用奠定基础。 第4章 详细介绍如何通过Flume、Kafka、Spark Streaming、HDFS、Flask等开源工具构建实时

5、与离线数据分析流水线。 第5章 从实际出发,详细介绍如何在Azure云平台,通过Node.js、Azure Queue、Azure Table、Spark Streaming、MLlib等组件对用户行为数据进行分析与推荐。 第6章 详细介绍如何通过Twitter API、Spark SQL、Spark Streaming、Cassandra、D3等组件对Twitter进行情感分析与统计分析。 第7章 详细介绍如何通过Scrapy、Kafka、MongoDB、Spark、Spark Streaming、Elastic Search等组件对新闻进行抓取、分析、热点新闻聚类等挖掘工作。 第8章 详细

6、介绍了协同过滤概念和模型,讲解了如何在Spark中实现基于Item-based、User-based和Model-based协同过滤算法的推荐系统。 第9章 详细介绍了社交网络分析的基本概念和经典算法,以及如何利用Spark实现这些经典算法,用于真实网络的分析。 第10章 详细介绍了主题分析模型(LDA),讲解如何在Spark中实现LDA算法,并且对真实的新闻数据进行分析。 第11章 详细介绍了搜索引擎的基本原理,以及其中用到的核心搜索排序相关算法PageRank和Ranking SVM,并讲解了如何在Spark中实现PageRank和Ranking SVM算法,以及如何对真实的 Web数据进

7、行分析。 如果你有一定的经验,能够理解Spark的相关基础知识和使用技巧,那么可以直接阅读第411章。然而,如果你是一名初学者,请一定从第1章的基础知识开始学起。 勘误和支持 由于笔者的水平有限,加之编写时间仓促,书中难免会出现一些错误或者不准确的地方,恳请读者批评指正。如果你有更多的宝贵意见,我们会尽量为读者提供最满意的解答。你也可以通过微博高 彦杰gyj,博客:http:/ 期待能够得到大家的真挚反馈,在技术之路上互勉共进。 致谢 感谢微软亚洲研究院的Thomas先生和Ying Yan,在每一次迷茫时给予我鼓励与支持。 感谢机械工业出版社华章公司的杨福川和高婧雅,在近半年的时间里始终支持我

8、们的写作,你们的鼓励和帮助引导我顺利完成全部书稿。 特别致谢 谨以此书献给我最亲爱的爱人,家人,同事,以及众多热爱大数据技术的朋友们! 高彦杰 第1章 Spark简介 本章主要介绍Spark框架的概念、生态系统、架构及RDD等,并围绕Spark的BDAS项目及其子项目进行了简要介绍。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLlib等子项目,本章只进行简要介绍,后续章节会有详细阐述。 1.1 初识Spark Spark是基于内存计算的大数据并行计算框架,因为它基于内存计算,所以提高了在大数据环境下数

9、据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上, 形成集群。 1.Spark执行的特点 Hadoop中包含计算框架MapReduce和分布式文件系统HDFS。 Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,融入Hadoop的生态系统,并弥补MapReduce的不足。 (1)中间结果输出 Spark将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多Stage的任务串联或者并行执行,而无需将Stage的中间结果输出到HDFS中,类似的引擎包括Flink、Dryad、Tez。 (2)数据格式和内存布局 Sp

10、ark抽象出分布式内存存储结构弹性分布式数据集RDD,可以理解为利用分布式的数组来进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,它可以精确到每条记录。Spark的特性是能够 控制数据在不同节点上的分区,用户可以自定义分区策略。 (3)执行策略 Spark执行过程中不同Stage之间需要进行Shuffle。Shuffle是连接有依赖的Stage的桥梁,上游Stage输出到下游Stage中必须经过Shuffle这个环节,通过Shuffle将相同的分组数据拆分后聚合到同一 个节点再处理。Spark Shuffle支持基于Hash或基于排序的分布式聚合机制。 (4)任务调度的开销 Spar

11、k采用了事件驱动的类库AKKA来启动任务,通过线程池的复用线程来避免系统启动和切换开销。 2.Spark的优势 Spark的一站式解决方案有很多的优势,分别如下所述。 (1)打造全栈多计算范式的高效数据流水线 支持复杂查询与数据分析任务。在简单的“Map”及“Reduce”操作之外,Spark还支持SQL查询、流式计算、机器学习和图算法。同时,用户可以在同一个工作流中无缝搭配这些计算范式。 (2)轻量级快速处理 Spark代码量较小,这得益于Scala语言的简洁和丰富表达力,以及Spark通过External DataSource API充分利用和集成Hadoop等其他第三方组件的能力。同时S

12、park基于内存计算,可通过中间结果 缓存在内存来减少磁盘I/O以达到性能的提升。 (3)易于使用,支持多语言 Spark支持通过Scala、Java和Python编写程序,这允许开发者在自己熟悉的语言环境下进行工作。它自带了80多个算子,同时允许在Shell中进行交互式计算。用户可以利用Spark像书写单机程序一样 书写分布式程序,轻松利用Spark搭建大数据内存计算平台并充分利用内存计算,实现海量数据的实时处理。 (4)与External Data Source多数据源支持 Spark可以独立运行,除了可以运行在当下的Yarn集群管理之外,它还可以读取已有的任何Hadoop数据。它可以运行

13、多种数据源,比如Parquet、Hive、HBase、HDFS等。这个特性让用户可以轻易 迁移已有的持久化层数据。 (5)社区活跃度高 Spark起源于2009年,当下已有超过600多位工程师贡献过代码。开源系统的发展不应只看一时之快,更重要的是一个活跃的社区和强大的生态系统的支持。 同时也应该看到Spark并不是完美的,RDD模型适合的是粗粒度的全局数据并行计算;不适合细粒度的、需要异步更新的计算。对于一些计算需求,如果要针对特定工作负载达到最优性能,还需要使 用一些其他的大数据系统。例如,图计算领域的GraphLab在特定计算负载性能上优于GraphX,流计算中的Storm在实时性要求很高

14、的场合要更胜Spark Streaming一筹。 1.2 Spark生态系统BDAS 目前,Spark已经发展成为包含众多子项目的大数据计算平台。BDAS是伯克利大学提出的基于Spark的数据分析栈(BDAS)。其核心框架是Spark,同时涵盖支持结构化数据SQL查询与分析的查询引 擎Spark SQL,提供机器学习功能的系统MLBase及底层的分布式机器学习库MLlib,并行图计算框架GraphX,流计算框架Spark Streaming,近似查询引擎BlinkDB,内存分布式文件系统Tachyon,资源 管理框架Mesos等子项目。这些子项目在Spark上层提供了更高层、更丰富的计算范式。

15、 图1-1展现了BDAS的主要项目结构图。 图1-1 伯克利数据分析栈(BDAS)主要项目结构图 下面对BDAS的各个子项目进行更详细的介绍。 (1)Spark Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map函数和reduce函数及计算模型,还提供了更为丰富的算子,例如filter、join、groupByKey等。 Spark将分布式数据抽象为RDD(弹性分布式数据集),并实现了应用任务调度、RPC、序列化和压缩,并为运行在其上层的组件提供API。其底层采用Scala这种函数式语言书写而成,并且所提供的API深 度借鉴函数式的编程思想

16、,提供与Scala类似的编程接口。 图1-2所示即为Spark的处理流程(主要对象为RDD)。 Spark将数据在分布式环境下分区,然后将作业转化为有向无环图(DAG),并分阶段进行DAG的调度和任务的分布式并行处理。 (2)Spark SQL Spark SQL提供在大数据上的SQL查询功能,类似于Shark在整个生态系统的角色,它们可以统称为SQL on Spark。之前,由于Shark的查询编译和优化器依赖Hive,使得Shark不得不维护一套Hive分 支。而Spark SQL使用Catalyst作为查询解析和优化器,并在底层使用Spark作为执行引擎实现SQL的算子。用户可以在Spa

17、rk上直接书写SQL,相当于为Spark扩充了一套SQL算子,这无疑更加丰富了 Spark的算子和功能。同时Spark SQL不断兼容不同的持久化存储(如HDFS、Hive等),为其发展奠定广阔的空间。 图1-2 Spark的任务处理流程图 (3)Spark Streaming Spark Streaming通过将流数据按指定时间片累积为RDD,然后将每个RDD进行批处理,进而实现大规模的流数据处理。其吞吐量能够超越现有主流流处理框架Storm,并提供丰富的API用于流数据计 算。 (4)GraphX GraphX基于BSP模型,在Spark之上封装类似Pregel的接口,进行大规模同步全局的

18、图计算,尤其是当用户进行多轮迭代的时候,基于Spark内存计算的优势尤为明显。 (5)MLlib MLlib是Spark之上的分布式机器学习算法库,同时包括相关的测试和数据生成器。MLlib支持常见的机器学习问题,例如分类、回归、聚类以及协同过滤,同时也包括一个底层的梯度下降优化基础算 法。 1.3 Spark架构与运行逻辑 1.Spark的架构 Driver:运行Application的main()函数并且创建SparkContext。 Client:用户提交作业的客户端。 Worker:集群中任何可以运行Application代码的节点,运行一个或多个Executor进程。 Executo

19、r:运行在Worker的Task执行器,Executor启动线程池运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务。 SparkContext:整个应用的上下文,控制应用的生命周期。 RDD:Spark的基本计算单元,一组RDD形成执行的有向无环图RDD Graph。 DAG Scheduler:根据Job构建基于Stage的DAG工作流,并提交Stage给TaskScheduler。 TaskScheduler:将Task分发给Executor执行。 SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。 2.运行逻

20、辑 (1)Spark作业提交流程 如图1-3所示,Client提交应用,Master找到一个Worker启动Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为RDD有向无环图,再由DAGScheduler将RDD有向无环图转化为 Stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor进行执行。任务执行的过程中其他组件再协同工作确保整个应用顺利执行。 图1-3 Spark架构 (2)Spark作业运行逻辑 如图1-4所示,在Spark应用中,整个执行流程在逻辑上运算之间会形成有向无环图。Action算子触发之后

21、会将所有累积的算子形成一个有向无环图,然后由调度器调度该图上的任务进行运算。Spark 的调度方式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。图中的A、B、C、D、E、F,分别代表不同的 RDD,RDD内的一个方框代表一个数据块。数据从HDFS输入Spark,形成RDD A和RDD C,RDD C上执行map操作,转换为RDD D,RDD B和RDD E进行join操作转换为F,而在B到F的过程中又会进行 Shuffle。最后RDD F通过函数saveAsSequenceFile输出保存到HDFS中

22、。 图1-4 Spark执行有向无环图 1.4 弹性分布式数据集 本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核 心原语,由数据结构和原语设计上层算法。Spark最终会将算法(RDD上的一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的分发。 1.5 本章小结 本章首先介绍了Spark分布式计算平台的基本概念、原理以及Spark生态系统BDAS之上的典型组件。Spark为用户提供了系统底层细节透明、编程接口简洁的分布式计算平台。Sp

23、ark具有内存计算、实 时性高、容错性好等突出特点。同时本章介绍了Spark的计算模型,Spark会将应用程序整体翻译为一个有向无环图进行调度和执行。相比MapReduce,Spark提供了更加优化和复杂的执行流。读者还可 以深入了解Spark的运行机制与Spark算子,这样能更加直观地了解API的使用。Spark提供了更加丰富的函数式算子,这样就为Spark上层组件的开发奠定了坚实的基础。 相信读者已经想了解如何开发Spark程序,接下来将就Spark的开发环境配置进行阐述。 第2章 Spark开发与环境配置 用户进行Spark应用程序开发,一般在用户本地进行单机开发调试,之后再将作业提交到

24、集群生产环境中运行。下面将介绍Spark开发环境的配置,如何编译和进行源码阅读环境的配置。 用户可以在官网上下载最新的AS软件包,网址为:http:/spark.apache.org/。 2.1 Spark应用开发环境配置 Spark的开发可以通过Intellij或者Eclipse IDE进行,在环境配置的开始阶段,还需要安装相应的Scala插件。 2.2 远程调试Spark程序 本地调试Spark程序和传统的调试单机的Java程序基本一致,读者可以参照原来的方式进行调试,关于单机调试本书暂不赘述。对于远程调试服务器上的Spark代码,首先请确保在服务器和本地的 Spark版本一致。需要按前文

25、介绍预先安装好JDK和Git。 (1)编译Spark 在服务器端和本地计算机下载Spark项目。 通过下面的命令克隆一份Spark源码: git clone https:/ 然后针对指定的Hadoop版本进行编译: SPARK_HADOOP_VERSION=2.3.0 sbt/sbt assembly (2)在服务器端的配置 1)根据相应的Spark配置指定版本的Hadoop,并启动Hadoop。 2)对编译好的Spark进行配置,在conf/spark-env.sh文件中进行如下配置: export SPARK_JAVA_OPTS=“-agentlib:jdwp=transport=dt_

26、socket,server=y,suspend=y,address=9999“ 其中“suspend=y”设置为需要挂起的模式。这样,当启动Spark的作业时候程序会自动挂起,等待本地的IDE附加(Attach)到被调试的应用程序上。address是开放等待连接的端口号。 (3)启动Spark集群和应用程序 1)启动Spark集群: ./sbin/start-all.sh 2)启动需要调试的程序,以Spark中自带的HdfsWordCount为例: MASTER=spark:/ 10.10.1.168:7077 ./bin/run-example org.apache.spark.exampl

27、es.streaming.HdfsWordCount hdfs:/ localhost:9000/test/test.txt 3)如图2-7所示,执行后程序会挂起并等待本地的Intellij进行连接,并显示“Listening for transport dt_socket at address: 9999”: 图2-7 远程调试 (4)本地IDE配置 1)配置并连接服务器端挂起的程序。 在Intellij中选择“run”“edit configuration”“remote”命令,在弹出的对话框中将默认配置中的端口号和IP改为服务器的地址,同时选择附加(Attach)方式,如图2-8所示。

28、图2-8 远程调试设置 2)在“Run/Debug Configurations”对话框中填入需要连接的主机名和端口号以及其他参数,如图2-8所示。 3)在程序中设置断点进行调试。 通过上面的介绍,用户可以了解如何进行远程调试。对于单机调试方式则和日常开发的单机程序一样,常用方式是设置单机调试断点之后再进行调试,在这里并不再展开介绍。 2.3 Spark编译 用户可以通过Spark的默认构建工具SBT进行源码的编译和打包。当用户需要对源码进行二次开发时,则需要对源码进行增量编译,通过下面的方式读者可以实现编译和增量编译。 (1)克隆Spark源码 可通过克隆的方式克隆Spark源码,如图2-9

29、所示。 git clone https:/ 图2-9 git clone Spark库 这样将会从github将Spark源码下载到本地,建立本地的仓库。 (2)编译Spark源码 在Spark项目的根目录内执行编译和打包命令(如图2-10所示)。 sbt/sbt assembly 执行过程中会解析依赖和下载需要的依赖jar包。执行完成后会将所有jar包打包为一个jar包,用户便可以运行Spark集群和示例了。 (3)增量编译 在有些情况下,用户需要修改源码,修改之后如果每次都重新下载jar包或者对全部源码重新编译一遍,会很浪费时间,用户通过下面的增量编译方法,可以只对改变的源码进行编译。

30、编译打包一个assembly的jar包。 $ sbt/sbt clean assembly 图2-10 编译Spark源码 这时的Spark程序已经可以运行。用户可以进入spark-shell执行程序。 $ ./bin/spark-shell 配置export SPARK_PREPEND_CLASSES参数为true,开启增量编译模式。 $ export SPARK_PREPEND_CLASSES=true 继续使用spark-shell中的程序: $ ./bin/spark-shell 这时用户可以对代码进行修改和二次开发:初始开发Spark应用,之后编译。 编译Spark源码: $ sbt

31、/sbt compile 继续开发Spark应用,之后编译。 $ sbt/sbt compile 解除增量编译模式: $ unset SPARK_PREPEND_CLASSES 返回正常使用spark-shell的情景。 $ ./bin/spark-shell # Back to normal, using Spark classes from the assembly Jar 如果用户不想每次都开启一个新的SBT会话,可以在compile命令前加上。 $ sbt/sbt compile (4)查看Spark源码依赖图 如果使用SBT进行查看依赖图(如图2-11所示),用户需要运行下面的命令:

32、 $ # sbt $ sbt/sbt dependency-tree 如果使用Maven进行查看依赖图(如图2-11所示),用户需要运行下面的命令: $ # Maven $ mvn -DskipTests install $ mvn dependency:tree 图2-11 查看依赖图 2.4 配置Spark源码阅读环境 由于Spark使用SBT作为项目管理构建工具,SBT的配置文件中配置了依赖的jar包网络路径,在编译或者生成指定类型项目时需要从网络下载jar包。需要用户预先安装git。在Linux操作系统或者 Windows操作系统上(用户可以下载Git Shell,在Git Shell

33、中进行命令行操作)通过“sbt/sbt gen-idea”命令,生成Intellij项目文件,然后在Intellij IDE中直接通过“Open Project”打开项目。 克隆Spark源码: git clone https:/ 在所需要的软件安装好后在spark源代码根目录下,输入以下命令生成Intellij项目: sbt/sbt gen-idea 这样SBT会自动下载依赖包和进行源文件编译以及生成Intellij所需要的项目文件。 2.5 本章小结 本章首先介绍了Spark应用程序的开发流程以及如何编译和调试Spark程序。用户可以选用对Scala项目能够很好支持的Intellij I

34、DE。如果用户想深入了解Spark,以及诊断问题,建议读者配置好源码阅 读环境,进行源码分析。 通过本章的介绍,读者可以进行Spark开发环境的搭建,以及程序的开发,后续将介绍Spark的生态系统BDAS。 第3章 BDAS简介 提到Spark不得不说伯克利大学AMPLab开发的BDAS(Berkeley Data Analytics Stack)数据分析的软件栈,如图3-1所示是其中的Spark生态系统。其中用内存分布式大数据计算引擎Spark替代原有 的MapReduce,上层通过Spark SQL替代Hive等SQL on Hadoop系统,Spark Streaming替换Storm等

35、流式计算框架,GraphX替换GraphLab等大规模图计算框架,MLlib替换Mahout等机器学习框架 等,其整体框架基于内存计算解决了原来Hadoop的性能瓶颈问题。AmpLab提出One Framework to Rule Them All的理念,用户可以利用Spark一站式构建自己的数据分析流水线。 图3-1 Spark生态系统 在一些数据分析应用中,用户可以使用Spark SQL预处理结构化数据,GraphX预处理图数据,Spark Streaming实时捕获和处理流数据,最终通过MLlib将数据融合,进行模型训练,底层各个系统通 过Spark进行运算。 下面将介绍其中主要的项目。

36、 3.1 SQL on Spark1 AMPLab将大数据分析负载分为三大类型:批量数据处理、交互式查询、实时流处理。而其中很重要的一环便是交互式查询。大数据分析栈中需要满足用户ad-hoc、reporting、iterative等类型的查询 需求,也需要提供SQL接口来兼容原有数据库用户的使用习惯,同时也需要SQL能够进行关系模式的重组。完成这些重要的SQL任务的便是Spark SQL和Shark这两个开源分布式大数据查询引擎,它们可以 理解为轻量级Hive SQL在Spark上的实现,业界将该类技术统称为SQL on Hadoop。 在Spark峰会2014上,Databricks宣布不再

37、支持Shark的开发,全力以赴开发Shark的下一代技术Spark SQL,同时Hive社区也启动了Hive on Spark项目,将Spark作为Hive(除MapReduce和Tez之 外的)新执行引擎。根据伯克利的Big Data Benchmark测试对比数据,Shark的In Memory性能可以达到Hive的100倍,即使是On Disk也能达到10倍的性能提升,是Hive强有力的替代解决方案。而作为 Shark的进化版本的Spark SQL,在AMPLab最新的测试中的性能已经超过Shark。图3-2展示了Spark SQL和Hive on Spark是新的发展方向。 图3-2

38、Spark SQL和Hive on Spark是新的发展方向 1 参考文章:高彦杰,陈冠诚Spark SQL:基于内存的大数据分析引擎程序员2014.8 3.2 Spark Streaming Spark Streaming是一个批处理的流式计算框架。它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性。下面将对Spark Streaming进行详细的介绍。 3.3 GraphX GraphX是Spark中的一个重要子项目,它利用Spark作为计算引擎,实现了大规模图计算的功能,并提供了类似Pregel的编程接口。GraphX的出现,将Spark生态系统变得更加

39、完善和丰富;同时以其 与Spark生态系统其他组件很好的融合,以及强大的图数据处理能力,在工业界得到了广泛的应用。本章主要介绍GraphX的架构、原理和使用方式。 3.4 MLlib MLlib是构建在Spark上的分布式机器学习库,充分利用了Spark的内存计算和适合迭代型计算的优势,将性能大幅度提升。同时由于Spark算子丰富的表现力,让大规模机器学习的算法开发不再复 杂。 3.5 本章小结 本章主要介绍了BDAS中广泛应用的几个数据分析组件。SQL on Spark提供在Spark上的SQL查询功能。让用户可以基于内存计算和SQL进行大数据分析。通过Spark Streaming,用户可

40、以构建实时流 处理应用,其高吞吐量,以及适合历史和实时数据混合分析的特性使其在流数据处理框架中突出重围。GraphX充当Spark生态系统中图计算的角色,其简洁的API让图处理算法的书写更加便捷。最后介绍 了MLlibSpark上的机器学习库,它充分利用Spark内存计算和适合迭代的特性,使分布式系统与并行机器学习算法实现了完美的结合。相信随着Spark生态系统的日臻完善,这些组件还会取得长足发 展。 第4章 Lamda架构日志分析流水线 4.1 日志分析概述 随着互联网的发展,在互联网上产生了大量的Web日志或移动应用日志,日志包含用户最重要的信息,通过日志分析,用户可以获取到网站或应用的访

41、问量,哪个网页访问人数最多,哪个网页最有价 值、用户的特征、用户的兴趣等。 一般中型的网站(10万的PV1以上),每天会产生1GB以上Web日志文件。大型或超大型的网站,可能每小时就会产生500GB1TB的数据量。 对于日志的这种规模的数据,通过Spark进行大规模日志分析与日志处理,能够达到很好的效果。 Web日志由Web服务器产生,现在互联网公司使用的主流的服务器可能是Nginx、Apache、Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(页面浏览)、UV(独立IP数)。更复杂 一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等。更为复杂的,构建广告

42、点击模型、分析用户行为特征等。 1.日志格式 目前常见的Web日志格式主要由两类:一种日志格式是Apache的NCSA日志格式,另一种日志格式是IIS的W3C日志格式。 下面以Nginx日志格式为例进行讲解。 Nginx日志示例格式: 222.68.172.111 - - 18/Sep/2013:06:49:57 +0000 “GET /images/my.jpg HTTP/1.1“ 200 19939 “http:/ “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66

43、 Safari/537.36“ 以下是本例中涉及的一些要素。 remote_addr:记录客户端的IP地址。本例为222.68.172.111。 remote_user:记录客户端用户名称,本例-表示为空。 time_local:记录访问时间与时区,本例为18/Sep/2013:06:49:57+0000。 request:记录请求的URL与HTTP协议,本例为GET/images/my.jpg HTTP/1.1。 status:记录请求状态,成功是200。 body_bytes_sent:记录发送给客户端文件主体内容大小,本例中为19939。 http_referer:用来记录从哪个页面链接

44、访问过来的,http:/ http_user_agent:记录客户浏览器的相关信息,本例中为Mozilla/5.0(Windows NT 6.1)AppleWebKit/537.36(KHTML,like Gecko)Chrome/29.0.1547.66 Safari/537.36。 注意 如果用户想要更多的信息,则要用其他手段去获取,通过JS代码单独发送请求,并使用cookies记录用户的访问信息。 通过利用这些日志信息,我们可以深入分析用户行为或网站状况了。 2.传统单机日志数据分析示例 当数据量较小(10MB,100MB,10GB),单机处理能够解决,可以通过各种Unix/Linux命

45、令或者工具,awk、grep、sort、join等都是日志分析的利器,再配合Perl、Python、正则表达式,基本就 可以解决常见日志分析的问题。 (1)Linux Shell进行单机日志分析示例 例如,想从上面提到的nginx日志中得到访问量最高的前10个IP,通过以下Shell进行分析: cat access.log.10 | awk a$1+ END for(b in a) print b“t“ab | sort -k2 -r | head -n 10 163.177.71.12 972 101.226.68.137 972 183.195.232.138 971 50.116.27.

46、194 97 14.17.29.86 96 61.135.216.104 94 61.135.216.105 91 61.186.190.41 9 59.39.192.108 9 220.181.51.212 9 (2)Python进行单机日志分析示例 检查Nginx的日志文件,统计基于每个独立IP地址的点击率,代码如下: #!/usr/bin/env python#coding:utf8 import re import sys contents = sys.argv1def NginxIpHite(logfile_path): #IP:4个字符串,每个字符串为13个数字,由点连接 ipad

47、d = r.join(rd1,3*4) re_ip = pile(ipadd) iphitlisting = for line in open(contents): match = re_ip.match(line) if match: ip = match.group( ) #如果IP存在增加1,否则设置点击率为1 iphitlistingip = iphitlisting.get(ip, 0) + 1 print iphitlisting NginxIpHite(contents) 运行并打印结果如下: rootchlinux 06# ./nginx_ip.py access_201406

48、10.log 183.3.121.84: 1, 182.118.20.184: 2, 182.118.20.185: 1, 190.52.120.38: 1, 182.118.20.187: 1, 202.108.251.214: 2, 61.135.190.101: 2, 103.22.181.247: 1, 101.226.33.190: 3, 183.129.168.131: 1, 66.249.73.29: 26, 182.118.20.202: 1, 157.56.93.38: 2, 219.139.102.237: 4, 220.181.108.178: 1, 220.181.108.179: 1, 182.118.25.233: 4, 182.118.25.232: 1, 182.118.25.231: 2, 182.118.20.186: 1, 174.129.228.67: 20 此脚本返回的是一个Key-Value映射,包含访问Nginx服务器的各个IP的点击数。用户可以通过这个示例再进行深入拓展,进行更丰富的日志信息和知识的获取。 (3)大规模分布式日志分析情况 当数据量每天以10GB、100GB增长的时候,单机处理能力已经不能满足需求。此时就需要增加系统的扩展性

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 建筑/环境 > 建筑资料


经营许可证编号:宁ICP备18001539号-1