加入收藏 | 设为首页 | 会员中心 | 我要投稿 北几岛 (https://www.beijidao.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

第2章:MapReduce

发布时间:2021-05-19 10:13:38 所属栏目:大数据 来源: https://www.jb51.cc
导读:MapReduce是一个数据处理的编程模型。这个模型很简单,但也不是简单到不能够支持一些有用的语言。Hadoop能够运行以多种语言写成的MapReduce程序。在这一章中,我们将看看怎样用Java,Ruby,Python语言来写同一个例子。更重要的是,MapReduce程序天生并发运行,
MapReduce是一个数据处理的编程模型。这个模型很简单,但也不是简单到不能够支持一些有用的语言。Hadoop能够运行以多种语言写成的MapReduce程序。在这一章中,我们将看看怎样用Java,Ruby,Python语言来写同一个例子。更重要的是,MapReduce程序天生并发运行,这就相当于把能够进行大数据分析的工具交到了某个拥有足够多机器的人手里。

气候数据集

在我们的例子中,将会写一个程序来挖掘天气数据。天气传感器每一个小时都会在全球的许多地方收集数据,并且也收集了大量的日志数据。这些数据非常适合于用MapReduce分析。因为我们想要处理所有数据,并且这些数据是半结构化的和面向记录的。

数据格式

我们所使用的数据来自于国家气候数据中心或称为NCDC。数据以行形式ASCII格式存储,每一行一条记录。这种格式支持丰富的气象属性集合,其中许多属性是可选的,长度可变的。简便起见,我们仅仅关注基本的属性,如温度。温度总是有值并且长度固定。
示例2-1显示了一行记录,并且将主要的属性进行了注释。这一行记录被分成了多行,每个属性一行。真实文件中,这些属性都会被放进一行,并且没有分隔符。

示例:2-1
0057
332130 # USAF 天气基站标识
99999 # WBAN 天气基站标识
19500101 # 观察日期
0300 # 观察时间
4
+51317 # 纬度 (角度 x 1000)
+028783 # 经度 (角度 x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 风向 (角度)
1 # 质量码
N
0072
1
00450 # 天空最高高度 (米)
1 # 质量码
C
N
010000 # 可见距离 (米)
1 # 质量码
N
9
-0128 # 空气温度 (摄氏度 x 10)
1 # 质量码
-0139 # 露点温度 (摄氏度 x 10)
1 # 质量码
10268 # 大气压 (百帕 x 10)
1 # 质量码

数据文件按照日期和天气基站整理。从1901到2001,每一年都有一个目录文件。每一个目录文件中包括每一个天气基站收集到的当年气候数据的压缩文件。例如1990年部分文件:

% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz

由于有成千上万个天气基站,所以每一年都由大量的相关小文件组成。通常处理少量的大文件更容易和有效。所以这些数据需要被预处理,使每一年的所有记录都被放到一个文件中(附录C中有详细的方法说明)。

使用Unix工具分析

如何获取每一年的全球最高温度呢?我们首先不使用Hadoop工具来回答这个问题。
这将会为我们提供一个性能基准线和检查我们往后的结果是否准确的方法。
经典的处理行结构数据的工具是awk。示例2-2向我们展示了如何获取每一年全球最高温度。

示例2-2
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"t"
gunzip -c $year | 
awk '{ temp = substr($0,88,5) + 0;
q = substr($0,93,1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done

这个脚本循环处理已经压缩的年文件,首先输出年度值,然后使用awk处理每一个文件。awk脚本从这些数据中提取出空气温度和质量码。空气温度通过加0转换成整数,下一步,判断温度(温度9999在NCDC中表示没检测到温度)和质量码是否有效。质量码表示此温度值是否准确或者错误。如果温度值没有问题,则与目前为止最高温度相比较,如果比目前最高温度高,则更新最高温度。当文件中所有行被处理之后,END块被执行,打印出最高温度。下面看看部分运行结果:

% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...

源文件中的温度值被扩大了10倍,所以1901年的最高温度是31.7摄氏度,由于在20世纪初读取到的气候值非常有限,所以这个结果只能是近似真实。在硬件是单个超大型高cpu EC2实例计算中跑完整个世纪的数据花了42分钟。

为了提高处理速度,我们需要并行运行部分程序。理论上,我们很容易想到可以使用计算机中所有可用的线程并行处理不同的年份数据。但是这样仍然存在一些问题。

首先,将整个处理工作进程等分为相同的部分并不简单或明显。在这个例子中,不同的年份的文件大小不一样,并且有的差别很大。所有一些处理进程将会完成地早一些,一些将会晚一些。即时完成早的进程再处理其它工作,整个运行时间仍然被最大的文件限制。一个更好的途径是将输入数据分成大小相等的块,并且处理每一个数据块。虽然这样可能造成更多的工作量。

第二,将每一个独立的处理结果合并在一起需要额外处理工作。在这个例子中,每一年的处理结果都是相互独立的。这些结果会被连接在一起,并且按年排序。如果通过数据量大小数据块途径,合并将更加容易出错。就这个例子而言,某一年的数据可能被分成多个数据块,每一个数据块都单独处理,并得到每一块的最高温度。最后,我们还需要找到某年中这些块中最高温度中的最高温度作为这一年的最高温度。

第三,你仍然会被单个计算机的处理能力限制。如果用单个计算机中所有的处理器,最快的处理时间是20分钟,那么,你不可能更快。而且有的数据集超过单个计算机的处理能力。当使用多台计算机一起处理时,一些其它的因素又会影响性性能,主要有协调性和可靠性两类。谁来执行所有的作业?我们将怎么处理失败的进程?

所以,虽然并行处理是可行的,但却是不那么容易控制的,是复杂的。使用像Hadoop这样的框架来处理这些问题极大地帮助了我们。

使用Hadoop分析数据

为了充分利用Hadoop提供的并行处理优势,我们需要将我们的查询写在一个MapReduce作业中。在本地的,小数据量地测试后,我们将能够在集群中运行它。

Map和Reduce

MapReduce将处理过程分成两阶段,map阶段和reduce阶段。每阶段将key-value键值对做为输入和输出。开发者可以选择输入输出参数类型,也能指定两个函数:map函数和reduce函数。

map阶段的输入数据是原始的NCDC数据。我们选择文本格式。文本中的每一行表示一条文本记录。key值是行开头距离当前文件开头的位移,但是我们不需要它,忽略即可。

map函数很简单。因为我们仅关心年份和温度,所以获取每行的年度和温度即可,其它属性不需要。这个例子中,仅仅是一个数据准备阶段,以某种方法准备reduce函数能够处理的数据。map函数还是一个丢弃坏记录的地方,例如那些没有测量到的,不准备的或错误的温度。

为了展现map怎么样工作的,选取少量的输入数据进行说明(为了适应页面宽度,一些没有使用到的列用省略号表示)
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以key-value的形式提供给map函数:
(0,0067011990999991950051507004...9999999N9+00001+99999999999...)
(106,0043011990999991950051512004...9999999N9+00221+99999999999...)
(212,0043011990999991950051518004...9999999N9-00111+99999999999...)
(318,0043012650999991949032412004...0500001N9+01111+99999999999...)
(424,0043012650999991949032418004...0500001N9+00781+99999999999...)
关键值是行的位移,在map函数中我们可以忽略它。map函数仅仅需要获取到年度和温度值(以粗体表示的数据),然后输出。输出的时候将温度值转换成整数。
(1950,0)
(1950,22)
(1950,?11)
(1949,111)
(1949,78)

map的输出结果在被送往reduce函数之前被MapReduce框架按照关键字排序合并处理。所以在进行下一步之前,reduce函数会接收到如下数据:
(1949,[111,78])
(1950,[0,22,?11])
如上所示,每一年的所有温度值都合并到一个列表中。reduce函数所要做的就是遍历每一年的温度,然后找到最高温度。
(1949,111)
(1950,22)
以上就是最终的输出:每一年的最高温度。
整个数据流程如图2-1所示。在图表底部是对应的Unix命令。它模拟整个MapReduce流程,我们将会在这章节的后面Hadoop Streaming中看到。图2-1 MapReduce逻辑数据流程图

JAVA MapReduce

在知道了MapReduce程序怎么样工作了之后,下一步是用代码实现它。我们需要做三件事情:map函数,reduce函数,运行作业的代码。map功能以Mapper抽象类表示
,它申明了一个map()抽象方法。示例2-3显示了map函数的实现。

示例2-3
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
      extends Mapper<LongWritable,Text,IntWritable> {
      private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key,Text value,Context context)
          throws IOException,InterruptedException {
          String line = value.toString();
          String year = line.substring(15,19);
          int airTemperature;
          if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
            airTemperature = Integer.parseInt(line.substring(88,92));
          } else {
            airTemperature = Integer.parseInt(line.substring(87,92));
          }
          String quality = line.substring(92,93);
          if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year),new IntWritable(airTemperature));
         }
     }
}

Mapper类是一个泛型,有四个形参,分别表示输入key,输入值,输出key,和map函数输出值类型。就当前的例子来说,输入key是一个长整型的位移,输入值是一行文本,输出key是年份,输出会是是空气温度(整数)。Hadoop使用它自己的基本类型集而不使用JAVA内建的基本类型。因为Hadoop自己的基本类型对网络序列化进行了优化。这些基本类型可以在 org.apache.hadoop.io pack

(编辑:北几岛)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读