Python 海量数据处理之 _Hadoop&Spark

1. 说明

  前篇介绍了安装和使用 Hadoop,本篇将介绍 Hadoop+Spark 的安装配置及如何用 Python 调用 Spark。

  当数据以 TB,PB 计量时,用单机处理数据变得非常困难,于是使用 Hadoop 建立计算集群处理海量数据,Hadoop 分为两部分,一部分是数据存储 HDFS,另一部分是数据计算 MapReduce。MapReduce 框架将数据处理分成 map,reduce 两段,使用起来比较麻烦,并且有一些限制,如:数据都是流式的,且必须所有 Map 结束后才能开始 Reduce。我们可以引入 Spark 加以改进。

 Spark 的优点在于它的中间结果保存在内存中,而非 HDFS 文件系统中,所以速度很快。用 Scala 语言可以像操作本地集合对象一样轻松地操作分布式数据集。虽然它支持中间结果保存在内存,但集群中的多台机器仍然需要读写数据集,所以它经常与 HDFS 共同使用。因此,它并非完全替代 Hadoop。

 Spark 的框架是使用 Scala 语言编写的,Spark 的开发可以使用语言有:Scala、R 语言、Java、Python。

2. Scala

 Scala 是一种类似 java 的编程语言,使用 Scala 语言相对来说代码量更少,调用 spark 更方便,也可以将它和其它程序混用。

  在不安装 scala 的情况下,启动 hadoop 和 spark,python 的基本例程也可以正常运行。但出于进一步开发的需要,最好安装 scala。

(1) 下载 scala

 http://www.scala-lang.org/download/

  我下载的是与 spark 中一致的 2.11 版本的非源码 tgz 包

(2) 安装

1
2
3
4
5
6
$ cd /home/hadoop #用户可选择安装的文件夹
$ tar xvzf tgz/scala-2.11.12.tgz
$ ln -s scala-2.11.12/ scala

在.bashrc中加入
export PATH=/home/hadoop/scala/bin:$PATH

3. 下载安装 Spark

(1) 下载 spark

 http://spark.apache.org/downloads.html

  我下载的版本是:spark-2.2.1-bin-hadoop.2.7.tgz

(2) 安装 spark

1
2
3
4
5
6
7
$ cd /home/hadoop #用户可选择安装的文件夹
$ tar xvzf spark-2.2.1-bin-hadoop2.7.tgz
$ ln -s spark-2.2.1-bin-hadoop2.7/ spark

在.bashrc中加入
export SPARK_HOME=/home/hadoop/spark
export PATH=$SPARK_HOME/bin:$PATH

(3) 配置文件

  不做配置,pyspark 可以在本机上运行,但不能使用集群中其它机器。配置文件在 $SPARK_HOME/conf/目录下。

  1. 配置 spark-env.sh
1
2
3
4
5
6
7
8
$ cd $SPARK_HOME/conf/
$ cp spark-env.sh.template spark-env.sh
按具体配置填写内容
export SCALA_HOME=/home/hadoop/scala
export JAVA_HOME=/exports/android/jdk/jdk1.8.0_91/
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/
  1. 设置主从服务器 slave
1
2
$ cp slaves.template slaves 
在其中列出从服务器地址,单机不用设
  1. 设置 spark-defaults.conf
1
2
3
4
5
6
7
$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
按具体配置填写内容
spark.master spark://master:7077
spark.eventLog.enabled false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 1g
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

(4) 启动

  运行 spark 之前,需要运行 hadoop,具体见之前的 Hadoop 文档

1
$ $SPARK_HOME/sbin/start-all.sh

  该脚本启动了所有 master 和 workers,在本机用 jps 查看,增加了 Worker 和 Master,

4. 命令行调用

  下面我们来看看从程序层面如何使用 Spark

(1) 准备工作

  在使用相对路径时,系统默认是从 hdfs://localhost:9000/中读数据,因此需要先把待处理的本地文件复制到 HDFS 上,常用命令见之前的 Hadoop 有意思。

1
2
$ hadoop fs -mkdir -p /usr/hadoop
$ hadoop fs -copyFromLocal README.md /user/hadoop/

(2) Spark 命令行

1
2
3
4
5
$ pyspark
>>> textFile = spark.read.text("README.md")
>>> textFile.count() # 返回行数
>>> textFile.first() # 返回第一行
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # 返回所有含Spark行的数据集

5. 程序

(1) 实现功能

  统计文件中的词频

(2) 代码

  这里使用了 spark 自带的例程 /home/hadoop/spark/examples/src/main/python/wordcount.py,和之前介绍过的 hadoop 程序一样,同样是实现的针对 key,value 的 map,reduce,一个文件就完成了,看起来更简徢更灵活,像是 hadoop 自带 MapReduce 的加强版。具体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)

spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect() # 收集结果
for (word, count) in output:
print("%s: %i" % (word, count))

spark.stop()

(3) 运行

 spark-submit 命令在 $HOME_SPARK/bin 目录下,之前设置了 PATH,可以直接使用

1
$ spark-submit $SPARK_HOME/examples/src/main/python/wordcount.py /user/hadoop/README.md

  参数是 hdfs 中的文件路径。

  此时访问 $SPARK_IP:8080 端口,可以看到程序 PythonWordCount 正在 hadoop 中运行。

6. 多台机器上安装 Spark 以建立集群

  和 hadoop 的集群设置类似,同样是把整个 spark 目录复制集群中其它的服务器上,用 slaves 文件设置主从关系,然后启动 $SPARK_HOME/sbin/start-all.sh。正常开启后可以通过网页查看状态:SparkMaster_IP:8080

7. 参考

  1. 官方帮助文档,具体见其 python 部分

http://spark.apache.org/docs/latest/quick-start.html

  1. Hadoop2.7.3+Spark2.1.0 完全分布式环境 搭建全过程

https://www.cnblogs.com/purstar/p/6293605.html