您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
使用Apache Spark构建实时分析Dashboard
 
来源:infoqQ 发布于: 2016-12-9
  1676  次浏览      
 

本篇文章中我们将学习如何使用Apache Spark streaming,Kafka,Node.js,Socket.IO和Highcharts构建实时分析Dashboard。

问题描述

电子商务门户希望构建一个实时分析仪表盘,对每分钟发货的订单数量做到可视化,从而优化物流的效率。

解决方案

解决方案之前,先快速看看我们将使用的工具:

Apache Spark– 一个通用的大规模数据快速处理引擎。Spark的批处理速度比Hadoop MapReduce快近10倍,而内存中的数据分析速度则快近100倍。 更多 关于Apache Spark的信息。

Python– Python是一种广泛使用的高级,通用,解释,动态编程语言。 更多 关于Python的信息。

Kafka– 一个高吞吐量,分布式消息发布订阅系统。 更多 关于Kafka的信息。

Node.js– 基于事件驱动的I/O服务器端JavaScript环境,运行在V8引擎上。 更多 关于Node.js的信息。

Socket.io– Socket.IO是一个构建实时Web应用程序的JavaScript库。它支持Web客户端和服务器之间的实时、双向通信。 更多 关于Socket.io的信息。

Highcharts– 网页上交互式JavaScript图表。 更多 关于Highcharts的信息。

CloudxLab– 提供一个真实的基于云的环境,用于练习和学习各种工具。你可以通过在线 注册 立即开始练习。

如何构建数据Pipeline?

下面是数据Pipeline高层架构图

数据Pipeline

实时分析Dashboard

让我们从数据Pipeline中的每个阶段的描述开始,并完成解决方案的构建。

阶段1

当客户购买系统中的物品或订单管理系统中的订单状态变化时,相应的订单ID以及订单状态和时间将被推送到相应的Kafka主题中。

数据集

由于没有真实的在线电子商务门户网站,我们准备用CSV文件的数据集来模拟。让我们看看数据集:

DateTime, OrderId, Status
2016-07-13 14:20:33,xxxxx-xxx,processing
2016-07-13 14:20:34,xxxxx-xxx,shipped
2016-07-13 14:20:35,xxxxx-xxx,delivered

数据集包含三列分别是:“DateTime”、“OrderId”和“Status”。数据集中的每一行表示特定时间时订单的状态。这里我们用“xxxxx-xxx”代表订单ID。我们只对每分钟发货的订单数感兴趣,所以不需要实际的订单ID。

数据集位于项目的 spark-streaming/data/order_data 文件夹中。

推送数据集到Kafka

shell脚本将从这些CSV文件中分别获取每一行并推送到Kafka。推送完一个CSV文件到Kafka之后,需要等待1分钟再推送下一个CSV文件,这样可以模拟实时电子商务门户环境,这个环境中的订单状态是以不同的时间间隔更新的。在现实世界的情况下,当订单状态改变时,相应的订单详细信息会被推送到Kafka。

运行我们的shell脚本将数据推送到Kafka主题中。登录到 CloudxLab Web控制台 并运行以下命令。

# Clone the repository

git clone https://github.com/singhabhinav/cloudxlab.git

# Create the order-data topic in Kafka

export PATH=$PATH:/usr/hdp/current/kafka-broker/bin
kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic order-data

# Go to Kafka directory

cd cloudxlab/spark-streaming/kafka

# Run the Script for pushing data to Kafka topic
# ip-172-31-13-154.ec2.internal is the hostname of broker.
# Find list of brokers in Ambari (a.cloudxlab.com:8080).
# Use hostname of any one of the brokers
# order-data is the Kafka topic

/bin/bash put_order_data_in_topic.sh ../data/order_data/
ip-172-31-13-154.ec2.internal:6667 order-data

# Script will push CSV files one by one to Kafka topic
after every one minute interval

# Let the script run. Do not close the terminal

阶段2

在第1阶段后,Kafka“order-data”主题中的每个消息都将如下所示

2016-07-13 14:20:33,xxxxx-xxx,processing

阶段3

Spark streaming代码将在60秒的时间窗口中从“order-data”的Kafka主题获取数据并处理,这样就能在该60秒时间窗口中为每种状态的订单计数。处理后,每种状态订单的总计数被推送到“order-one-min-data”的Kafka主题中。

请在Web控制台中运行这些Spark streaming代码

# Login to CloudxLab web console in the second tab

# Create order-one-min-data Kafka topic

export PATH=$PATH:/usr/hdp/current/kafka-broker/bin
kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic order-one-min-data

# Go to spark directory

cd cloudxlab/spark-streaming/spark

# Run the Spark Streaming code

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar
spark_streaming_order_status.py localhost:2181 order-data

# Let the script run. Do not close the terminal

阶段4

在这个阶段,Kafka主题“order-one-min-data”中的每个消息都将类似于以下JSON字符串

{
"shipped": 657,
"processing": 987,
"delivered": 1024
}

阶段5

运行Node.js server

现在我们将运行一个node.js服务器来使用“order-one-min-data”Kafka主题的消息,并将其推送到Web浏览器,这样就可以在Web浏览器中显示出每分钟发货的订单数量。

请在Web控制台中运行以下命令以启动node.js服务器

# Login to CloudxLab web console in the third tab

# Go to node directory

cd cloudxlab/spark-streaming/node

# Install dependencies as specified in package.json

npm install

# Run the node server

node index.js

# Let the server run. Do not close the terminal

现在node服务器将运行在端口3001上。如果在启动node服务器时出现“EADDRINUSE”错误,请编辑index.js文件并将端口依次更改为3002...3003...3004等。请使用3001-3010范围内的任意可用端口来运行node服务器。

用浏览器访问

启动node服务器后,请转到 http://YOUR_WEB_CONSOLE:PORT_NUMBER 访问实时分析Dashboard。如果您的Web控制台是f.cloudxlab.com,并且node服务器正在端口3002上运行,请转到 http://f.cloudxlab.com:3002 访问Dashboard。

当我们访问上面的URL时,socket.io-client库被加载到浏览器,它会开启服务器和浏览器之间的双向通信信道。

阶段6

一旦在Kafka的“order-one-min-data”主题中有新消息到达,node进程就会消费它。消费的消息将通过socket.io发送给Web浏览器。

阶段7

一旦web浏览器中的socket.io-client接收到一个新的“message”事件,事件中的数据将会被处理。如果接收的数据中的订单状态是“shipped”,它将会被添加到HighCharts坐标系上并显示在浏览器中。

截图

我们已成功构建实时分析Dashboard。这是一个基本示例,演示如何集成Spark-streaming,Kafka,node.js和socket.io来构建实时分析Dashboard。现在,由于有了这些基础知识,我们就可以使用上述工具构建更复杂的系统。

   
1676 次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划

APP推广之巧用工具进行数据分析
Hadoop Hive基础sql语法
应用多级缓存模式支撑海量读服务
HBase 超详细介绍
HBase技术详细介绍
Spark动态资源分配
更多...   

Hadoop与Spark大数据架构
Hadoop原理与高级实践
Hadoop原理、应用与优化
大数据体系框架与应用
大数据的技术与实践
Spark大数据处理技术

GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...