MyBlog

【Paper】MapReduce: Simplified Data Processing on Large Clusters

last modified: 2020-11-21 18:58

Google要做网页搜索,涉及到大量网页排名,单机已经无法满足计算需求,因此只能写分布式程序,使计算在多台机器执行。分布式程序开发会遇到诸多难题,如数据分区,程序调度,处理节点宕机,处理节点间通信等等。

MapReduce是一种编程模型,它的出现帮助我们屏蔽了分布式程序的实现细节,简化了大规模数据并行计算的过程。

一、模型

用户程序fork出一个master,多个worker;master负责task调度,worker负责task执行,task有map task和reduce task。整个计算过程称为一个Job。

输入被切分成M个文件(抽象,可以是数据库等任何实现Reader接口的对象)

被分配map task的worker,读取切分的文件,解析K/V,传入map函数,输出K/V到中间文件。

输出的文件分区成R个文件,这个过程叫shuffle。文件生成后通知master,master将其通知给reduce workers。

reduce worker通过RPC取到中间文件,reduce需要从多个节点取数据,当取完所有数据后进行计算。

结果输出到文件。

当所有map和reduce task完成,master将结果返回用户。

二、实例

倒排索引实例:

map函数解析document,生成的中间文件。

给定一个word,reduce收集所有doc_id,生成 pair,所有的pairs构成倒排索引。

reduce负责收集的word范围,如hash(word) / number of R.

三、容错

  1. master失败,整个Job失败。(实际中,可以有一个standby的master,处理master故障的情况,但因为涉及到master间状态同步,性能可能有损失)

  2. worker失败,master通过心跳机制检测到worker离线,所有该worker完成的task被标记为idle状态,该状态task被重新调度。

当一个map task先被A worker执行,后被B worker执行(因为A worker失败),所有reduce task会收到通知,改由从worker B读取数据。

同一个map任务master收到两次通知,则忽略第二次。因为它是已经被完成过(因为worker失败,生成过两个一模一样的map task,我们只需要一个即可)

reduce task亦是这样的。

这个机制在Google,依赖的GFS rename的原子性,首先map函数和reduce函数生成的结果在临时文件,当任务完成时,rename到正常文件,这个操作是原子性的。

后完成的task,rename会失败,因为文件已经不存在了。这里的关键是GFS raname的原子性。

四、性能优化

  • map就近读取

带宽在2004年是个瓶颈,GFS和MapReduce worker在同一个cluster中。map task调度时,master可分配其读取local磁盘的切分文件。

  • 任务粒度

map task要多于worker数量,如有2K个worker,可以生成200K个map task。这样减小单个map task的重试时间,减小长尾任务影响。另外使快的节点做更多任务,慢的节点做更少任务。

  • backup tasks

因为慢节点的存在,任务收尾会比较慢,分配更小粒度的task可以缓解。但也有其它方案,

在剩余最后一定比例的map或reduce tasks时,我们认为他们在慢的节点上执行,慢节点成为整个Job的短板。

所以master此时起一部分相同的任务,谁先完成用谁的结果,来将慢task收敛。

  • combiner

map完成后,比如我们会生成这样的K/V pair中间文件:

<apple, 1>

<apple, 1>

<banana, 1>

这样会浪费带宽,我们可以把reduce函数提前在这里应用,结果成为:

<apple, 2>

<banana, 1>

这个过程,叫做combiner。其实就是在map阶段局部执行reduce,减小reduce阶段拉取数据的带宽使用。

  • 输入与输出类型

我们前面介绍的都是文件读写,其实输入可以是任意的,可以是数据库等,只要实现Reader。写入也可以是自定义的,不一定是GFS,只要实现了Writer。

  • 错误处理

map或reduce总有可能出错,因为代码的bug,脏数据不完善。

worker进程都会注册bus error或segmentation的错误处理函数,当处理到脏数据时,通过UDP将seq number上报到master,同一行数据发生多次错误时,master调度task重跑时会忽略掉这条数据。

  • 本地执行

分布式系统里面,bug不好调。所以Google的设计支持让worker,master都支行在同一个机器,即Local模式,以方便调试。

  • 状态信息

master收集task等状态数据,通过http提供访问,使可视化的任务执行流程,如map task个数,耗时等等。

Counter计数,可以统计map/reduce task处理的消息条数,上报到master。也是供观察Job运行状态。


再啰嗦一下,

map是给定任何格式数据,生成K/V

reduce是将map生成的K/V,group by K