10:衍生数据-批处理

第十章-衍生数据-批处理

文章主要内容来自于设计数据密集型应用

前面的文章讨论了很多关于请求查询以及相应的响应结果。许多现有数据系统中都采用这种数据处理方式:你发送请求指令,一段时间后(我们期望)系统会给出一个结果。数据库,缓存,搜索索引,Web服务器以及其他一些系统都以这种方式工作。
三种不同类型的系统:

服务(在线系统)

服务等待客户的请求或指令到达。每收到一个,服务会试图尽快处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性通常非常重要(如果客户端无法访问服务,用户可能会收到错误消息)。

批处理系统(离线系统)

一个批处理系统有大量的输入数据,跑一个作业(job)来处理它,并生成一些输出数据,这往往需要一段时间(从几分钟到几天),所以通常不会有用户等待作业完成。相反,批量作业通常会定期运行(例如,每天一次)。批处理作业的主要性能衡量标准通常是吞吐量(处理特定大小的输入所需的时间)。本章中讨论的就是批处理。

流处理系统(准实时系统)

流处理介于在线和离线(批处理)之间,所以有时候被称为准实时(near-real-time)准在线(nearline)处理。像批处理系统一样,流处理消费输入并产生输出(并不需要响应请求)。但是,流式作业在事件发生后不久就会对事件进行操作,而批处理作业则需等待固定的一组输入数据。这种差异使流处理系统比起批处理系统具有更低的延迟。

本章讨论批处理系统。

使用Unix工具的批处理

从一个例子开始。
Web 服务器,每次处理请求,在日志中附加一行。使用nginx默认的访问日志格式。

1
2
3
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1" 
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"

日志格式:

1
2
$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"

简单日志分析

想在你的网站上找到五个最受欢迎的网页:

1
2
3
4
5
6
cat /var/log/nginx/access.log | #1
awk '{print $7}' | #2
sort | #3
uniq -c | #4
sort -r -n | #5
head -n 5 #6

输出结果:

1
2
3
4
5
4189 /favicon.ico
3631 /2013/05/24/improving-security-of-ssh-private-keys.html
2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
1369 /
915 /css/typography.css

特点:简单且强大。几分钟内完成许多数据分析,性能非常好。

命令链与自定义程序

除了上述的 Unix 命令链。也可以用程序实现,例如在Ruby中:

1
2
3
4
5
6
7
8
9
10
counts = Hash.new(0)         # 1
File.open('/var/log/nginx/access.log') do |file|
file.each do |line|
url = line.split[6] # 2
counts[url] += 1 # 3
end
end

top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" } # 5

特点:不简洁,但可读性强。性能较差。

排序 VS 内存中的聚合

上述两种做法的区别:

  • Ruby 脚本在内存中保存了 URL 的哈希表,保存 URL 和其出现的次数。
  • Unix 管道没有哈希表,依赖排序。

哪个好?

  • 工作集较小时,内存散列表表现良好;
  • 如果作业的工作集大于可用内存,则排序的方法优点是可以高效地使用磁盘。
    • GNU Coreutils(Linux)中的sort程序通过溢出至磁盘的方式来自动应对大于内存的数据集,并能同时使用多个CPU核进行并行排序。
    • 瓶颈可能是从磁盘读取输入文件的速度。

Unix哲学

Unix管道的发明者道格·麦克罗伊(Doug McIlroy)在1964年说:让数据像水管一样流动。

  1. 让每个程序都做好一件事。要做一件新的工作,写一个新程序,而不是通过添加“功能”让老程序复杂化。
  2. 期待每个程序的输出成为另一个程序的输入。不要将无关信息混入输出。避免使用严格的列数据或二进制输入格式。不要坚持交互式输入。
  3. 设计和构建软件时,即使是操作系统,也让它们能够尽早地被试用,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
  4. 优先使用工具来减轻编程任务,即使必须曲线救国编写工具,且在用完后很可能要扔掉大部分。

这种方法 —— 自动化,快速原型设计,增量式迭代,对实验友好,将大型项目分解成可管理的块 —— 听起来非常像今天的敏捷开发和DevOps运动。奇怪的是,四十年来变化不大。

Unix 怎么实现命令的可组合性?

统一的接口

为什么要统一的接口?

  • 一个程序的输出作为另外一个程序的输入,那么必然要统一。

怎么实现统一的接口?

  • 在Unix中,这种接口是一个文件(file)(更准确地说,是一个文件描述符)。
  • 一个文件只是一串有序的字节序列。
  • 这是一个非常简单的接口,所以可以使用相同的接口来表示许多不同的东西:
    • 文件系统上的真实文件,
    • 另一个进程(Unix套接字,stdin,stdout)的通信通道
    • 设备驱动程序(比如/dev/audio或/dev/lp0)
    • 表示TCP连接的套接字等等。

每个程序怎么操作?

  • 按照惯例,许多(但不是全部)Unix程序将这个字节序列视为ASCII文本。
  • 将它们的输入文件视为由\n(换行符,ASCII 0x0A)字符分隔的记录列表。
  • 每条记录(即一行输入)的解析则更加模糊。 Unix工具通常通过空白或制表符将行分割成字段,但也使用CSV(逗号分隔),管道分隔和其他编码。

今天,像Unix工具一样流畅地运行程序是一种例外,而不是规范。

逻辑与布线相分离

Unix 怎么做输入输出:

  • Unix工具的另一个特点是使用标准输入(stdin)和标准输出(stdout)。
    • 如果你运行一个程序,而不指定任何其他的东西,标准输入来自键盘,标准输出指向屏幕。
    • 但是,你也可以从文件输入和/或将输出重定向到文件。
    • 管道允许你将一个进程的标准输出附加到另一个进程的标准输入(有个小内存缓冲区,而不需要将整个中间数据流写入磁盘)。
  • 将输入/输出布线与程序逻辑分开,可以将小工具组合成更大的系统。

Unix 输入输出的缺点:

  • 需要多个输入或输出的程序虽然可能,却非常棘手。
  • 你没法将程序的输出管道连接至网络连接中。
  • 如果程序直接打开文件进行读取和写入,或者将另一个程序作为子进程启动,或者打开网络连接,那么I/O的布线就取决于程序本身了。
  • 它仍然可以被配置(例如通过命令行选项),但在Shell中对输入和输出进行布线的灵活性就少了。

透明度和实验

使Unix工具如此成功的部分原因是,它们使查看正在发生的事情变得非常容易:

  • Unix命令的输入文件通常被视为不可变的。这意味着你可以随意运行命令,尝试各种命令行选项,而不会损坏输入文件。
  • 你可以在任何时候结束管道,将管道输出到less,然后查看它是否具有预期的形式。这种检查能力对调试非常有用。
  • 你可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使你可以重新启动后面的阶段,而无需重新运行整个管道。

优点

  • 与关系数据库的查询优化器相比,即使Unix工具非常简单,但仍然非常有用,特别是对于实验而言。

缺点

  • Unix工具的最大局限在于它们只能在一台机器上运行 —— 而Hadoop这样的工具即应运而生。

MapReduce和分布式文件系统

MapReduce 是什么?

  • 有点像 Unix 工具,不过分布在数千台机器上。
  • 简单、易用
  • 通常不会修改输入,除了生成输出外没有任何副作用。
  • 输出文件以连续的方式一次性写入(一旦写入文件,不会修改任何现有的文件部分)。
  • 读写的是 HDFS 等分布式文件系统。
    • 基于无共享的原则。
    • 不需要特殊的硬件。
    • 每台机器运行了一个守护进程,对外暴露网络服务;
    • 名为 NameNode的中央处理器跟踪哪个文件块存储在哪台机器上;
    • 为了容忍机器故障,文件块会被复制到多台机器上;
    • 可伸缩性很好:上万台机器,PB 级别存储。

MapReduce作业执行

MapReduce 怎么运行的?
以上文中的 Web 服务器日志分析为例。

  1. 读取一组输入文件,并将其分解成记录(records)。在Web服务器日志示例中,每条记录都是日志中的一行(即\n是记录分隔符)。
  2. 调用Mapper函数,从每条输入记录中提取一对键值。在前面的例子中,Mapper函数是awk '{print $7}':它提取URL($7)作为键,并将值留空。
  3. 按键排序所有的键值对。在日志的例子中,这由第一个sort命令完成。
  4. 调用Reducer函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中,Reducer是由uniq -c命令实现的,该命令使用相同的键来统计相邻记录的数量。

步骤 2(Map)和步骤 4 (Reduce)需要自己编程写代码。
步骤 1 由输入格式解析器处理;
步骤 3 隐含在 MapReduce 中——因为Mapper的输出始终在送往Reducer之前进行排序。

如何自定义 Mapper 和 Reducer?
Mapper

  • Mapper 会在每条输入记录上调用一次,其工作是从输入记录中提取键值。
  • 对于每个输入,它可以生成任意数量的键值对(包括None)。
  • 它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的

Reducer

  • MapReduce 框架拉取由 Mapper 生成的键值对,收集属于同一个键的所有值,并在这组值上迭代调用 Reducer。
  • Reducer 可以产生输出记录(例如相同URL的出现次数)。

如果 reduce 之后还需要再排序怎么办?

  • 再编写第二个 MapReduce 作业并将第一个作业的输出用作第二个作业的输入来实现它。

分布式执行MapReduce

MapReduce 和 Unix 管道的主要区别?

  • MapReduce 在多个机器上并行执行计算,而无需编写代码来显式处理并行问题。
  • Mapper和Reducer一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性。

Hadoop MapReduce作业中的数据流
image.png如上图

Mapper 的分区?

  • 并行化基于分区:作业的输入通常是HDFS中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理map任务
  • 每个输入文件的大小通常是数百兆字节。
  • MapReduce调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个Mapper,只要该机器有足够的备用RAM和CPU资源来运行Mapper任务。
  • 这个原则被称为将计算放在数据附近:它节省了通过网络复制输入文件的开销,减少网络负载并增加局部性。

Mapper 任务的代码在运行它的机器上还不存在咋办?

  • MapReduce框架首先将代码(例如Java程序中的JAR文件)复制到适当的机器。
  • 然后启动Map任务并开始读取输入文件,一次将一条记录传入Mapper回调函数。
  • Mapper的输出由键值对组成。

Reducer 的分区?

  • 计算的Reduce端也被分区。
  • 虽然Map任务的数量由输入文件块的数量决定,但Reducer的任务的数量是由作业作者配置的(它可以不同于Map任务的数量)。
  • 为了确保具有相同键的所有键值对最终落在相同的Reducer处,框架使用键的散列值来确定哪个Reduce任务应该接收到特定的键值对。

排序怎么做的?

  • 键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。
  • 相反,分类是分阶段进行的。首先每个Map任务都按照Reducer对输出进行分区。每个分区都被写入Mapper程序的本地磁盘,使用的技术与我们在“SSTables与LSM树”中讨论的类似。
  • 只要当Mapper读取完输入文件,并写完排序后的输出文件,MapReduce调度器就会通知Reducer可以从该Mapper开始获取输出文件。
  • Reducer连接到每个Mapper,并下载自己相应分区的有序键值对文件。按Reducer分区,排序,从Mapper向Reducer复制分区数据,这一整个过程被称为混洗(shuffle)。(一个容易混淆的术语 —— 不像洗牌,在MapReduce中的混洗没有随机性)。

Reduce 做了什么?

  • Reduce任务从Mapper获取文件,并将它们合并在一起,并保留有序特性。
  • 因此,如果不同的Mapper生成了键相同的记录,则在Reducer的输入中,这些记录将会相邻。
  • Reducer调用时会收到一个键,和一个迭代器作为参数,迭代器会顺序地扫过所有具有该键的记录(因为在某些情况可能无法完全放入内存中)。
  • Reducer可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录会写入分布式文件系统上的文件中(通常是在跑Reducer的机器本地磁盘上留一份,并在其他机器上留几份副本)。

MapReduce工作流

单个MapReduce 的局限?

  • 单个MapReduce作业可以解决的问题范围很有限。
  • 以日志分析为例,单个MapReduce作业可以确定每个URL的页面浏览次数,但无法确定最常见的URL,因为这需要第二轮排序。
  • 因此将MapReduce作业链接成为工作流(workflow) 中是极为常见的

Hadoop MapReduce 怎么实现工作流?

  • Hadoop MapReduce框架对工作流没有特殊支持,所以这个链是通过目录名隐式实现的:
    • 第一个作业必须将其输出配置为HDFS中的指定目录;
    • 第二个作业必须将其输入配置为从同一个目录。
  • 从MapReduce框架的角度来看,这是两个独立的作业。
  • 被链接的MapReduce作业并没有那么像Unix命令管道(它直接将一个进程的输出作为另一个进程的输入,仅用一个很小的内存缓冲区)。
  • 它更像是一系列命令,其中每个命令的输出写入临时文件,下一个命令从临时文件中读取。
  • 这种设计有利也有弊,我们将在“物化中间状态”中讨论。

如何处理工作流中不同作业之间的依赖?

  • 只有当作业成功完成后,批处理作业的输出才会被视为有效的(MapReduce会丢弃失败作业的部分输出)。
  • 因此,工作流中的一项作业只有在先前的作业(即生产其输入的作业)成功完成后才能开始。
  • 为了处理这些作业之间的依赖,有很多针对Hadoop的工作流调度器被开发出来,包括Oozie,Azkaban,Luigi,Airflow和Pinball
  • 在维护大量批处理作业时非常有用。在构建推荐系统时,由50到100个MapReduce作业组成的工作流是常见的;大型组织中,许多不同的团队可能运行不同的作业来读取彼此的输出。

Reduce侧Join与分组

第二章讨论过数据数据模型和查询语言的Join(连接),这里讨论它是如何实现的。

两条记录存在关联是很常见的:

  • 关系模型中的外键,文档模型中的文档引用或图模型中的
  • 当你需要同时访问这一关联的两侧(持有引用的记录与被引用的记录)时,join 就是必须的。

MapReduce 可以通过索引来关联数据吗?

  • 在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用索引来快速定位感兴趣的记录。如果查询涉及到join ,则可能涉及到查找多个索引。
  • 然而MapReduce没有索引的概念 —— 至少在通常意义上没有。

MapReduce 作业怎么找到想要读的部分数据?

  • 它读取所有这些文件的全部内容;(数据库中称之为全表扫描)
  • 如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。
  • 但通常是需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。
  • 因此,我们在批处理语义中讨论链接时,一般是同时处理所有用户的数据,而非某个特定用户的数据。

示例:用户活动事件分析

一个批处理作业中join 的典型例子。左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件(activity events)点击流数据(clickstream data)),右侧是用户数据库。
image.png
分析任务需要将用户活动与用户档案信息相关联:例如,如果档案包含用户的年龄或出生日期,系统就可以确定哪些页面更受哪些年龄段的用户欢迎。

有什么简单方法实现上述任务?

  • 逐个遍历活动事件,并为每个遇到的用户ID查询用户数据库(在远程服务器上)。
    • 缺点:性能很差,吞吐量低,可能压垮数据库。

为什么批处理任务不用上面的方法?

  • 为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)在单台机器上进行(而不是访问外部数据,如数据库)。
  • 为待处理的每条记录发起随机访问的网络请求实在是太慢了。
  • 查询远程数据库意味着批处理作业变为非确定的(nondeterministic),因为远程数据库中的数据可能会改变。

批处理任务中,更好的实现方法是什么?

  • 更好的方法是获取用户数据库的副本(ETL,数据仓库),并将它和用户行为日志放入同一个分布式文件系统中,如 HDFS。
  • 然后用MapReduce将所有相关记录集中到同一个地方进行高效处理。

排序合并连接

join 过程中 Mapper 的工作?
Mapper的目的是从每个输入记录中提取一对键值,在上面 hadoop的 map reduce 的情况下,这个键就是用户ID:

  • 一组Mapper会扫过活动事件(提取用户ID作为键,活动事件作为值)
  • 而另一组Mapper将会扫过用户数据库(提取用户ID作为键,用户的出生日期作为值)。

过程如下所示:
image.png
join 过程中怎么做分区?

  • 当MapReduce框架通过对Mapper输出进行分区,然后对键值对进行排序时,效果是具有相同ID的所有活动事件和用户记录在Reducer输入中彼此相邻。(在上图中,三个104分区到了parition 1,173 和 103 分到 partition 2)
  • Map-Reduce作业甚至可以也让这些记录排序,使Reducer总能先看到来自用户数据库的记录,紧接着是按「出生日期」排序的活动事件 —— 这种技术被称为二次排序(secondary sort)。

join 过程中 reducer 的工作?

  • Reducer可以容易地执行实际的join 逻辑:每个用户ID都会被调用一次Reducer函数,且因为二次排序,第一个值应该是来自用户数据库的「出生日期」记录。
  • Reducer将「出生日期」存储在局部变量中,然后使用相同的用户ID遍历活动事件,输出已观看网址观看者年龄的结果对。随后的Map-Reduce作业可以计算每个URL的查看者年龄分布,并按年龄段进行聚集。

为什么叫做排序合并连接?

  • 由于Reducer一次处理一个特定用户ID的所有记录,因此一次只需要将一条用户记录保存在内存中,而不需要通过网络发出任何请求。
  • 这个算法被称为排序合并连接(sort-merge join),因为Mapper的输出是按键排序的,然后Reducer将来自join 两侧的有序记录列表合并在一起。

把相关数据放在一起

为什么需要把相关数据放在一起?

  • 在排序合并join 中,Mapper和排序过程确保了所有对特定用户ID执行join 操作的必须数据都被放在同一个地方:单次调用Reducer的地方。
  • 预先排好了所有需要的数据,Reducer可以是相当简单的单线程代码,能够以高吞吐量和与低内存开销扫过这些记录。

为什么可以看做“消息”传递?

  • Mapper将“消息”发送给Reducer。
  • 当一个Mapper发出一个「键值对」时,这个「键」的作用就像值应该传递到的目标地址。
  • 即使「键」只是一个任意的字符串(不是像IP地址和端口号那样的实际的网络地址),它表现的就像一个地址:所有具有相同键的键值对将被传递到相同的目标(一次Reducer的调用)。

MapReduce 编程模型中用户需要考虑网络通信吗?

  • 不用
  • 使用MapReduce编程模型,能将计算的物理网络通信层面(从正确的机器获取数据)从应用逻辑中剥离出来(获取数据后执行处理)。
  • 这种分离与数据库的典型用法形成了鲜明对比,从数据库中获取数据的请求经常出现在应用代码内部。
  • 由于MapReduce处理了所有的网络通信,因此它也避免了让应用代码去担心部分故障,例如另一个节点的崩溃:MapReduce在不影响应用逻辑的情况下能透明地重试失败的任务。

分组

除了 join(连接) 之外,还有什么“把相关数据放在一起”的方法?

  • 常见的还有:按某个键对记录分组(如SQL中的GROUP BY子句)。
  • 所有带有相同键的记录构成一个组,而下一步往往是在每个组内进行某种聚合操作。比如:
    • 统计每个组中记录的数量(例如在统计PV的例子中,在SQL中表示为COUNT(*)聚合)
    • 对某个特定字段求和(SQL中的SUM(fieldname)
    • 按某种分级函数取出排名前k条记录。

MapReduce实现这种「分组」操作?

  • 最简单方法是设置Mapper,以便它们生成的键值对使用所需的分组键
  • 然后分区和排序过程将所有具有相同分区键的记录导向同一个Reducer。
  • 因此在MapReduce之上实现分组和join 看上去非常相似。

分组还有什么应用?

  • 分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为会话化(sessionization))。
  • 例如:
    • 可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本的用户更有购买欲(A/B测试)
    • 或者计算某个营销活动是否有效。

当有多个 web 服务器,特定用户活动事件分散在多个不同机器上怎么办?

  • 可以通过使用会话cookie,用户ID或类似的标识符作为分组键,以将特定用户的所有活动事件放在一起来实现会话化
  • 与此同时,不同用户的事件仍然散布在不同的分区中

处理偏斜

什么是热键?

  • 如果存在与单个键关联的大量数据,则“将具有相同键的所有记录放到相同的位置”这种模式就被破坏了。
  • 这种不成比例的活动数据库记录被称为关键对象(linchpin object)【38】或热键(hot key)

什么是倾斜?

  • 在单个Reducer中收集与某个名人相关的所有活动(例如他们发布内容的回复)可能导致严重的偏斜(也称为热点(hot spot))—— 也就是说,一个Reducer必须比其他Reducer处理更多的记录。
  • 由于MapReduce作业只有在所有Mapper和Reducer都完成时才完成,所有后续作业必须等待最慢的Reducer才能启动。

怎么解决数据倾斜?

  • 如果join 的输入存在热键,可以使用一些算法进行补偿。

以 Pig 中解决数据倾斜为例:

  • 例如,Pig中的偏斜连接(skewed join) 方法首先运行一个抽样作业(Sampling Job)来确定哪些键是热键。
  • join 实际执行时,Mapper会将热键的关联记录随机(相对于传统MapReduce基于键散列的确定性方法)发送到几个Reducer之一
  • 对于另外一侧的 join 输入,与热键相关的记录需要被复制所有处理该键的Reducer上。
  • 这种技术将处理热键的工作分散到多个Reducer上,这样可以使其更好地并行化,代价是需要将 join 另一侧的输入记录复制到多个Reducer上。
    • 优点:将处理热键的工作分散到多个Reducer上,这样可以使其更好地并行化
    • 缺点:代价是需要将 join 另一侧的输入记录复制到多个Reducer上。

Crunch处理数据倾斜:

  • Crunch中的分片连接(sharded join) 方法与之类似,但需要显式指定热键而不是使用抽样作业。

Hive 处理数据倾斜:

  • Hive 偏斜 join 优化采取了另一种方法,它需要在表格元数据中显式指定热键,并将与这些键相关的记录单独存放,与其它文件分开。
  • 当在该表上执行 join 时,对于热键,它会使用**Map端 join **(请参阅下一节)
  • 当按照热键进行分组并聚合时,可以将分组分两个阶段进行:
    • 第一个MapReduce阶段将记录发送到随机Reducer,以便每个Reducer只对热键的子集执行分组,为每个键输出一个更紧凑的中间聚合结果。
    • 然后,第二个MapReduce作业将所有来自第一阶段Reducer的中间聚合结果合并为每个键一个值。

Map侧 join

怎么理解上文是 Reducer 侧 join :

  • 上一节中的链接算法都是在 Reducer 中执行的,所以被称为 reduce 侧 join。
  • Mapper 扮演数据预处理的角色,从每个输入记录中提取键值,将键值分配给 Reducer 分区,并按键排序。
    • 优点:不用对输入数据做任何假设:不用管属性和结构,Mapper 都可以做预处理以备 Join
    • 缺点:排序,复制到 Reducer,合并 Reducer 输入,这些操作可能开销巨大。数据可能要落盘好几次,取决于可用的内存缓冲区。

Map 侧 Join:

  • 如果你对数据做出某些假设,就可以用 Map 侧 Join来加快速度。
  • 省掉了 Reducer 与排序的 MapReduce 作业,每个 mapper 都是简单的从分布式文件系统中读取一个输入,然后输出到文件系统,仅此而已。

广播散列连接

广播散列连接(broadcast hash join)

  • 一个最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个Mapper的内存中。
  • 每个 Mapper 在扫描「大数据集」时,简单地从内存散列表中查找每个『小数据集』的数据。
  • 广播:每个 join 较大输入端分区的 Mapper 都会将较小输入端数据集整个读入内存中(所以较小输入实际上“广播”到较大数据的所有分区上)
  • 散列:反映了它使用一个散列表。
  • Pig, Hive, Cascading和Crunch 都支持这种 join 。
  • 此外,还可以将较小输入存储在本地磁盘中,索引在内存中。

分区散列连接

  • 如果Map侧 join 的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。
  • 每个Mapper只需要从输入两端各读取一个分区就足够了。
  • 好处是每个Mapper都可以在内存散列表中少放点数据。
  • 适用场景:这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的MapReduce作业生成的,那么这可能是一个合理的假设。

Map侧合并连接

  • 如果数据集以相同的方式进行分区、还基于相同的键进行排序,那么还可以用 Map 侧连接的变体。
  • 输入是否小到能放到内存不重要。
  • 因为这时候Mapper同样可以执行归并操作(通常由Reducer执行):按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。

MapReduce工作流与Map侧连接

影响到输出:

  • 当下游作业使用MapReduce join 的输出时,Map侧连接或Reduce侧 join 的不同选择会影响输出的结构。
  • Reduce侧连接的输出是按照连接键进行分区和排序的;
  • Map端连接的输出则按照与「大数据集」相同的方式进行分区和排序;

对输入有假设:

  • Map侧连接也对输入数据集的大小,有序性和分区方式做出了更多假设。
  • 你必须了解数据按哪些键做的分区和排序,以及分区数量等。

批处理工作流的输出

一个疑问:MapReduce处理完成之后的最终结果是什么?我们最开始为什么要跑这些作业?

在数据库查询的场景中,我们将事务处理(OLTP)与分析两种目的区分开来:

  • OLTP查询通常根据键查找少量记录,使用索引,并将其呈现给用户(比如在网页上)。
  • 分析查询通常会扫描大量记录,执行分组与聚合,输出通常有着报告的形式:显示某个指标随时间变化的图表,或按照某种排位取前10项,或将一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。

批处理放哪里合适?

  • 它不属于事务处理,也不是分析。
  • 它和分析比较接近,因为批处理通常会扫过输入数据集的绝大部分。
  • 但批处理过程的输出通常不是报表,而是一些其他类型的结构。

建立搜索索引

MapReduce诞生的背景

  • Google 最初使用 MapReduce 是为其搜索引擎建立索引
  • Hadoop MapReduce 仍然是为 Lucene/Solr 构建索引的好方法

Lucene这样的全文搜索索引是如何工作的?

  • 它是一个文件(关键词字典),你可以在其中高效地查找特定关键字,并找到包含该关键字的所有文档ID列表(文章列表)。

如何用 MapReduce 构建索引?

  • Mapper根据需要对文档集合进行分区,每个Reducer构建该分区的索引,并将索引文件写入分布式文件系统。
  • 构建这样的文档分区索引并行处理效果拔群。

索引怎么更新?

  • 由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。
  • 如果索引的文档集合发生更改
    • 一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。
      • 如果只有少量的文档发生了变化,这种方法的计算成本可能会很高。
      • 优点:是索引过程很容易理解:文档进,索引出
    • 另一种选择是增量建立索引。如果需要索引中添加,删除或更新文档,Lucene会写新的段文件,并在后台异步合并压缩段文件。

键值存储作为批处理输出

除了建立搜索索引之外,批处理还有什么用途?

  • 构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统

机器学习系统的批处理输出到哪里?

  • 通常是某种数据库,例如可以通过给定 ID 查询其推荐好友的数据库

这些数据库怎么用?

  • 通常需要被 Web 服务所查询。

批处理过程的输出到Web应用可以查询的数据库中呢?

  • 一条条插入到数据库

为什么一条条插入数据库不是好主意呢?

  • 每条记录发起一个网络请求,比批处理的吞吐慢了几个量级
  • 数据库可能被压垮,导致线上故障
  • 当批任务重试的时候,得操心数据库的情况

如果不一条条插入数据库,还有什么好主意?

  • 在批处理作业创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上节中的搜索索引一样。
  • 这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。
  • 不少键值存储都支持在MapReduce作业中构建数据库文件,包括Voldemort ,Terrapin ,ElephantDB 和 HBase 批量加载。

这么做的优点?

  • 因为文件只读(不再修改),所以数据结构很简单,不用预写式日志(WAL)
  • 在Voldemort加载数据时,使用旧文件继续提供服务,当加载完成自动将查询切换到新文件。如果出现问题,则回滚。

批处理输出的哲学

Unix哲学鼓励以显式指明数据流的方式进行实验:程序读取输入并写入输出。

  • 输入保持不变;没有副作用;可以随意改动或调试;

MapReduce作业的输出处理遵循同样的原理。

  • 如果代码错误或者输出损坏,可以回滚代码,然后重跑。输出就会被修正。
    • 数据库没有这个属性:回滚代码无法修复数据库中的数据
    • 能够从错误代码中恢复的概念被称为人类容错(human fault tolerance)
  • 由于回滚容易,可以容忍错误。这种最小化不可逆性(minimizing irreversibility) 的原则有利于敏捷软件开发。
  • 如果Map或Reduce任务失败,MapReduce框架将自动重新调度,并在同样的输入上再次运行它。
    • 如果是代码错误,那么几次重试后将失败;
    • 如果是临时问题导致的,那么故障被容忍;
    • 由于输入不可变,所以重试是安全的。
  • 同一组文件可用作各种不同作业的输入
  • 与Unix工具类似,MapReduce作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以专注实现一个做好一件事的作业;而其他团队可以决定何时何地运行这项作业。

Unix 与 Hadoop 的不同:

  • 大多数 Unix 工具都假设输入输出是无类型文本文件,所以它们必须做大量的输入解析工作(本章开头的日志分析示例使用 {print $7} 来提取URL)。
  • 在 Hadoop 上可以通过使用更结构化的文件格式消除一些低价值的语法转换:比如 Avro(请参阅“Avro”)和 Parquet(请参阅“列存储”)经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进(见第四章)。

Hadoop与分布式数据库的对比

MapReduce 与 大规模并行处理(MPP, massively parallel processing)的区别?

  • MPP数据库专注于在一组机器上并行执行分析SQL查询
  • 而MapReduce和分布式文件系统的组合则更像是一个可以运行任意程序的通用操作系统。

存储多样性

分布式文件系统与数据库的区别?

  • 数据库要求你根据特定的模型(例如关系或文档)来构造数据
  • 而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。

使用原始格式保存数据的优点?

  • 实践经验表明,简单地使数据快速可用 —— 即使它很古怪,难以使用,使用原始格式 —— 也通常要比事先决定理想数据模型要更有价值
  • 以原始形式收集数据,稍后再操心模式的设计,能使数据收集速度加快(有时被称为“数据湖(data lake)”或“企业数据中心(enterprise data hub)
  • 转移了解释数据的负担:数据的解释成为消费者的问题(读时模式),有利于跨团队合作。
  • 这种方法被称为寿司原则(sushi principle):“原始数据更好”

处理模型的多样性

MPP 数据库和 MapReduce 在处理模型上的区别?

  • MPP 数据库是单体的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。
    • 优点:可以针对数据库特定调优;可以用 SQL 查询语言表达查询,无需代码。
    • 缺点:并非所有类型的处理都可以合理地表达为SQL查询(如索引、推荐系统)。
  • MapReduce 能够轻松地在大型数据集上运行自己的代码。
    • 优点
      • 语法上:可以建立一个SQL查询执行引擎,如 Hive 项目;也可以编写代码;还可以编写其他模型;
      • 运行上:分布式。

针对频繁故障设计

MPP 数据库和 MapReduce 在处理故障上的区别?

  • 与在线系统相比,批处理系统对故障不敏感。
  • MPP 数据库:
    • 如果有一个节点执行查询时失败,MPP 数据库会中止整个查询,并让用户重试。
    • 查询通常在几分钟内,因此这种重试可以接受。
    • 倾向于内存中保留尽可能多的数据,以避免磁盘读取的开销
  • MapReduce:
    • 可以容忍单个Map或Reduce任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。
    • 它也会非常急切地将数据写入磁盘,一方面是为了容错,另一部分是因为假设数据集太大而不能适应内存。

MapReduce 在处理故障上的优点?

  • 更适合大的作业(任务多,易失败)
  • 不是因为硬件很不可靠,而是因为任意终止进程的自由有利于提高计算集群中的资源利用率
    • 场景:如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。从而提高资源利用率。
    • 在谷歌,运行一个小时的MapReduce任务有大约有5%的风险被终止,为了给更高优先级的进程挪地方。

MapReduce之后

MapReduce 是简单,但导致复杂任务从头编写任务繁重。
很多高级编程模型被创造,如 Pig,Hive,Cascading,Crunch;
MapReuce 是非常稳健,但其他工具可能快上几个量级。

物化中间状态

什么是物化中间状态?

  • 每个MapReduce作业都独立于其他任何作业,跨团队合作时,分布式系统的文件只是简单的中间状态(intermediate state)
  • 将这个中间状态写入文件的过程称为物化(materialization)

MapReduce完全物化中间状态与 Unix 管道的比较?

  • Unix管道将一个命令的输出与另一个命令的输入连接起来。管道并没有完全物化中间状态,而是只使用一个小的内存缓冲区,将输出增量地流(stream) 向输入。
  • MapReduce完全物化中间状态的不足:
    • :MapReduce作业只有在前驱作业(生成其输入)中的所有任务都完成时才能启动(慢节点拖慢整个工作流程),而由Unix管道连接的进程会同时启动,输出一旦生成就会被消费。
    • Mapper通常是多余的:它们仅仅是读取刚刚由Reducer写入的同样文件,为下一个阶段的分区和排序做准备。
      • 在许多情况下,Mapper代码可能是前驱Reducer的一部分:如果Reducer和Mapper的输出有着相同的分区与排序方式,那么Reducer就可以直接串在一起,而不用与Mapper相互交织。
    • 复制很浪费:将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,对这些临时数据这么搞就比较过分了。

数据流引擎

为了解决上述问题,开发了数据流引擎:Spark, Tez, Flink。

数据流引擎与 MapReduce 的区别?

  • 数据流引擎的共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。
  • 像MapReduce一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。
  • 与MapReduce不同,这些函数不需要严格扮演交织的Map与Reduce的角色,而是可以以更灵活的方式进行组合。我们称这些函数为算子(operators)

数据流引擎提供了哪些不同的选项来将一个算子的输出连接到另一个算子的输入?

  • 一种选项是对记录按键重新分区并排序,就像在MapReduce的混洗阶段一样。
  • 另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区散列连接的工作,因为构建散列表还是会把顺序随机打乱。
  • 对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区。

与MapReduce模型相比,数据流引擎有什么优点?

  • 排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个Map和Reduce阶段之间出现。
  • 没有不必要的Map任务,因为Mapper所做的工作通常可以合并到前面的Reduce算子中(因为Mapper不会更改数据集的分区)。
  • 调度程序能够利用局部性进行优化,比如消费数据任务与生成数据的任务放在相同的机器上。
  • 算子间的中间状态足以保存在内存中或写入本地磁盘,这比写入HDFS需要更少的I/O(必须将其复制到多台机器,并将每个副本写入磁盘)。
  • 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。
  • 与MapReduce(为每个任务启动一个新的JVM)相比,现有Java虚拟机(JVM)进程可以重用来运行新算子,从而减少启动开销。

使用数据流引擎执行与MapReduce工作流同样的计算,通常执行速度要明显快得多。

容错

完全物化中间状态至 HDFS 的优点?

  • 具有持久性,这使得MapReduce中的容错相当容易,任务失败时可以在另一台机器上重启。

Spark,Flink和Tez没有将中间状态写入HDFS,怎么容错?

  • 它们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则它会从其他仍然可用的数据重新计算(在可行的情况下是先前的中间状态,要么就只能是原始输入数据,通常在HDFS上)。
  • 为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用了哪些输入分区?应用了哪些算子? Spark使用弹性分布式数据集(RDD,Resilient Distributed Dataset) 的抽象来跟踪数据的谱系,而Flink对算子状态存档,允许恢复运行在执行过程中遇到错误的算子。

重新计算数据时,计算有不确定行怎么办?

  • 在重新计算数据时,重要的是要知道计算是否是确定性的(相同的输入数据,算子是否始终有相同的输出?)。对于不确定性算子来说,解决方案通常是杀死下游算子,然后再重跑新数据。
  • 为了避免这种级联故障,最好让算子具有确定性。
    • 但是非确定性行为,很容易溜进来。比如哈希表的迭代、随机数。
    • 最好消除掉上述行为,比如随机数设定固定的种子。

重新计算一定是最佳选择吗?

  • 不是
  • 如果中间状态比原始数据小得多,或者计算量大,那么物化中间状态更好。

关于物化的讨论

流引擎的输入和输出与物化。

  • 排序算子必须消费全部的输入后才能输出,因子需要排序的算子都需要至少暂时累积状态。
  • 使用数据流引擎时,HDFS 上的物化数据集通常仍是作业的输入和输出。
  • 比起 MapReduce 的改进是,不用自己把中间状态写入文件了。

图与迭代处理

批处理中为什么会处理图?

  • 目标是在整个图上执行某种离线处理或分析
  • 这种需求经常出现在机器学习应用(如推荐引擎)或排序系统中。如,PageRank。

Spark, Flink 等流处理引擎,把算子作为有向无环图(DAG)的一部分安排在作业中。
而图处理中,数据本身具有图的性质。

许多图算法是通过 BFS/DFS 实现的,MapReduce 实现它很低效,因为 MapReduce 是一条条处理的。

Pregel处理模型

针对图批处理的优化 —— 批量同步并行(BSP,Bulk Synchronous Parallel) 计算模型已经开始流行起来。

  • 其中,Apache Giraph ,Spark的GraphX API和Flink的Gelly API 实现了它。
  • 它也被称为Pregel模型,因为Google的Pregel论文推广了这种处理图的方法。

Pregel 是怎么处理图的?(每个顶点是一个处理)

  • 在MapReduce中,Mapper在概念上向Reducer的特定调用“发送消息”,因为框架将所有具有相同键的Mapper输出集中在一起。
  • Pregel背后有一个类似的想法:一个顶点可以向另一个顶点“发送消息”,通常这些消息是沿着图的边发送的。
  • 在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用Reducer一样。
  • 与MapReduce的不同之处在于,在Pregel模型中,顶点在一次迭代到下一次迭代的过程中会记住它的状态,所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息,那里就不需要做任何工作。

容错

Pregel作业怎么容错?

  • 消息批处理,且等待通信的次数减少了;每次迭代中发送的消息处理完成才能处理下一轮迭代。
  • 网络丢失、重复、延迟消息时,Pregel 能保证消息在其目标顶点恰好处理一次。
  • 迭代结束时,定期存档所有顶点的状态,持久化存储。

并行执行

  • 图的分区取决于框架——比如顶点运行在哪台机器上,怎么通过网络通信。
    • 理想情况下,大量通信的顶点最好放在同一台机器上。
    • 实践上很难做到,因为图通常按照任意分配的 ID 分区。
  • 图通常有跨机器的额外开销,中间状态(节点之间发送的消息)往往比原始图大。网络发送消息的开销,会显著拖慢分布式图算法的速度。
    • 如果图能放在单机内存中处理,单机算法很可能超过分布式处理。
    • 分布式处理图的并行算法是一个进行中的研究领域。

高级API和语言

目前大规模处理的能力已经具备,在研究的是改进编程模型,提高处理效率,扩大这些技术可以解决的问题集。

  • Hive, Pig, Spark, Flink 等都有自己的高级数据流 API,很方便简单高效。

向声明式查询语言的转变

  • MapReduce 可以自由执行任意代码,但是用户自定义的 UDF 使用起来不方便。
  • 数据流处理引擎发现还是声明式(类 SQL)语言更好,因为内部可以做 join 的优化,列存储,向量化执行等。
  • 批处理框架越来越像 MPP 数据库了(性能也可以媲美)。

专业化的不同领域

  • 传统上,MPP数据库满足了商业智能分析和业务报表的需求
  • 而现在,随着批处理系统获得各种内置功能以及高级声明式算子,且随着MPP数据库变得更加灵活和易于编程,两者开始看起来相似了:最终,它们都只是存储和处理数据的系统。

本章小结

输入输出

  • 在Unix世界中,允许程序与程序组合的统一接口是文件与管道;
  • 在MapReduce中,该接口是一个分布式文件系统。
  • 数据流引擎添加了自己的管道式数据传输机制,以避免将中间状态物化至分布式文件系统,但作业的初始输入和最终输出通常仍是HDFS。

分布式批处理框架需要解决的两个主要问题是:
分区

  • 在MapReduce中,Mapper根据输入文件块进行分区。Mapper的输出被重新分区、排序并合并到可配置数量的Reducer分区中。这一过程的目的是把所有的相关数据(例如带有相同键的所有记录)都放在同一个地方。
  • 后MapReduce时代的数据流引擎若非必要会尽量避免排序,但它们也采取了大致类似的分区方法。

容错

  • MapReduce经常写入磁盘,这使得从单个失败的任务恢复很轻松,无需重新启动整个作业,但在无故障的情况下减慢了执行速度。
  • 数据流引擎更多地将中间状态保存在内存中,更少地物化中间状态,这意味着如果节点发生故障,则需要重算更多的数据。
  • 确定性算子减少了需要重算的数据量。

讨论了几种MapReduce的连接算法:
排序合并连接
每个参与连接的输入都通过一个提取连接键的Mapper。通过分区、排序和合并,具有相同键的所有记录最终都会进入相同的Reducer调用。这个函数能输出连接好的记录。
广播散列连接
两个连接输入之一很小,所以它并没有分区,而且能被完全加载进一个哈希表中。因此,你可以为连接输入大端的每个分区启动一个Mapper,将输入小端的散列表加载到每个Mapper中,然后扫描大端,一次一条记录,并为每条记录查询散列表。
分区散列连接
如果两个连接输入以相同的方式分区(使用相同的键,相同的散列函数和相同数量的分区),则可以独立地对每个分区应用散列表方法。

批处理作业的显著特点是,它读取一些输入数据并产生一些输出数据,但不修改输入—— 换句话说,输出是从输入衍生出的。最关键的是,输入数据是有界的(bounded):它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个作业知道自己什么时候完成了整个输入的读取,所以一个工作在做完后,最终总是会完成的。


10:衍生数据-批处理
https://blog.longpi1.com/2024/07/29/10:衍生数据-批处理/