1 2 3 4
>> 返回首页
数据质量监控管理系统
互联网舆情监控系统
思数客户满意度测评系统
思数商业智能与分析
当前位置:>> 解决方案 >> 文章详情
Hadoop 在Last.fm的应用
来源:思数云技术服务中心 时间:2013-5-29 14:58:16

Hadoop应用实例
--《Hadoop权威指南》连载系列之第16章
2011年09月22日10:23 it168网站原创 作者:(美) Tom White 编辑:唐蓉 评论:2条
  Hadoop 在Last.fm的应用
  Last.fm:社会音乐史上的革命
  Last.fm创办于2002年,它是一个提供网络电台和网络音乐服务的社区网站,向用户提供很多服务,例如免费听音乐和音乐下载,音乐及重大事件推荐,个性化图表服务以及其他很多服务。每个月大约有2500万人使用Last.fm,因而产生大量需要处理的数据。一个例子就是用户传输他们正在收听的音乐信息(也就是收藏 “scrobbling”)。Last.fm处理并且存储这些数据,以便于用户可以直接访问这些数据(用图表的形式),并且可以利用这些数据来推断用户的个人音乐品味、喜好和喜爱的艺术家,然后用于寻找相似的音乐。
  Hadoop在Last.fm中的应用
  随着Last.fm服务的发展,用户数目从数千增长到数百万,这时,存储、处理和管理这些用户数据渐渐变成一项挑战。幸运的是,当大家认识到Hadoop技术能解决众多问题之后,Hadoop的性能迅速稳定下来,并被大家积极地运用。2006年初,Last.fm开始使用Hadoop,几个月之后便投入实际应用。Last.fm使用Hadoop的理由归纳如下。
  分布式文件系统为它所存储的数据(例如,网志,用户收听音乐的数据)提供冗余备份服务而不增加额外的费用。
  可以方便地通过增添便宜、普通的硬件来满足可扩展性需求。
  当时Last.fm财力有限,Hadoop是免费的。
  开源代码和活跃的社区团体意味着Last.fm能够自由地修改Hadoop,从而增添一些自定义特性和补丁。
  Hadoop提供了一个弹性的容易掌握的框架来进行分布式计算。
  现在,Hadoop已经成为Last.fm基础平台的关键组件,目前包括2个Hadoop集群,涉及50台计算机、300个内核和100 TB的硬盘空间。在这些集群上,运行着数百种执行各种操作的日常作业,例如日志文件分析、A/B测试评测、即时处理和图表生成。本节的例子将侧重于介绍产生图表的处理过程,因为这是Last.fm对Hadoop的第一个应用,它展示出Hadoop在处理大数据集时比其他方法具有更强的功能性和灵活性。
  用Hadoop产生图表
  Last.fm使用用户产生的音轨收听数据来生成许多不同类型的图表,例如针对每个国家或个人音轨数据的一周汇总图表。许多Hadoop程序处理收听数据和产生这些图表,它们可以以天、周或月为单位执行。图16-1展示了这些数据在网站上如何显示的一个例子,本例是音乐的周排行统计数据。
  
  ▲图16-1. Last.fm音乐排行统计图表
  通常情况下,Last.fm有两种收听信息。
  用户播放自己的音乐(例如,在PC机或其他设备上听MP3文件),这种信息通过Last.fm的官方客户端应用或一种第三方应用 (有上百种)发送到Last.fm。
  用户收听Last.fm某个网络电台的节目,并在本地计算机上通过流技术缓冲一首歌。Last.fm播放器或站点能被用来访问这些流数据,然后它能给用户提供一些额外的功能,比如允许用户对她收听的音频进行喜爱、跳过或禁止等操作。
  在处理接收到的数据时,我们对它们进行分类:一类是用户提交的收听的音乐数据从现在开始,第一类数据称为“scrobble”(收藏数据);另一类是用户收听的Last.fm的电台数据(从现在开始,第二类数据称为“radio listen”(电台收听数据)。为了避免Last.fm的推荐系统出现信息反馈循环的问题,对数据源的区分是非常重要的,而Last.fm的推荐系统只使用scrobble数据。Last.fm的Hadoop程序的一项重要任务就是接受这些收听数据,做统计并形成能够在Last.fm网站上进行显示和作为其他Hadoop程序输入的数据格式。这一过程是Track Statistics(音轨统计)程序实现的,它就是在以下几节描述的实例。
  Track Statistics程序
  音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由空格分隔的文本文件,包含的信息有用户ID(userId)、音乐(磁道)ID(trackId)、这首音乐被收藏的次数(Scrobble)、这首音乐在电台中收听的次数(Radio)以及被选择跳过的次数(Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数据,它是Track Statistics程序的输入(真实数据达GB数量级,并且具有更多的属性字段,为了方便介绍,这里省略了其他的字段)。
  
  ▲表16-1. 收听数据
  这些文本文件作为初始输入提供给Track Statistics程序,它包括利用这个输入数据计算各种数据值的两个作业和一个用来合并结果的作业(见图16-2)。
  Unique Listeners作业模块统计收听同一首音频的不同用户数,通过累计不同用户对该音频文件的第一次访问而忽略同一用户对这一文件的多次访问,即可得到该数值。Sum作业模块通过对所有用户的所有收听信息进行计数来为每个音频统计收听总数、收藏总数、电台收听总数以及被跳过的总数。
  
  ▲图16-2. 音频状态统计作业
  尽管这两个作业模块的输入格式是相同的,我们仍然需要两个作业模块,因为Unique Listeners作业模块负责为每个用户对每个音频产生统计值,而Sum作业模块为每个音频产生统计值。最后Merge作业模块负责合并由这两个模块产生的中间输出数据得到最终统计结果。运行这段程序的最终结果是对每个音频产生以下几项数值:
  不同的听众数
  音频的收藏次数
  音频在电台中的点播次数
  音频在电台中被收听的总次数
  音频在电台广播中被跳过的次数
  下面我们将详细介绍每个作业模块和它的MapReduce阶段。请注意,由于篇幅有限,所提供的代码段已被简化。要想下载完整的代码,请参考本书“前言”。
  计算不同的听众数
  Unique Listeners作业模块用于计算每个音频的不同收听用户数。
  UniqueListenerMaper UniqueListenerMaper程序处理用空格分隔的原始收听数据,然后对每个track ID(音频ID)产生相应的user ID(用户ID):
  public void map(LongWritable position, Text rawLine, OutputCollector
  IntWritable> output, Reporter reporter) throws IOException {
  String[] parts = (rawLine.toString()).split(" ");
  int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]);
  int radioListens = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]);
  // if track somehow is marked with zero plays - ignore
  if (scrobbles <= 0 && radioListens <= 0) {
  return;
  }
  // if we get to here then user has listened to track,
  // so output user id against track id
  IntWritable trackId = new IntWritable(
  Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]));
  IntWritable userId = new IntWritable(
  Integer.parseInt(parts[TrackStatisticsProgram.COL_USERID]));
  output.collect(trackId, userId);
  }
  UniqueListenersReducer UniqueListenersReducer接收到每个track ID对应的user ID数据列表之后,把这个列表放入Set类型对象以消除重复的用户ID数据。然后输出每个track ID对应的这个集合的大小(不同用户数)。但是如果某个键对应的值太多,在set对象中存储所有的reduce值可能会有内存溢出的危险。实际上还没有出现过这个问题,但是为了避免这一问题,我们可以引入一个额外的MapReduce处理步骤来删除重复数据或使用辅助排序的方法(详细内容请参考第241页的“辅助排序”小节)。
  public void reduce(IntWritable trackId, Iterator values,
  OutputCollector output, Reporter reporter)
  throws IOException {
  Set userIds = new HashSet();
  // add all userIds to the set, duplicates automatically removed (set contract)
  while (values.hasNext()) {
  IntWritable userId = values.next();
  userIds.add(Integer.valueOf(userId.get()));
  }
  // output trackId -> number of unique listeners per track
  output.collect(trackId, new IntWritable(userIds.size()));
  }
  表16-2是这一作业模块的样本输入数据。map输出结果如表16-3所示,reduce输出结果如表16-4所示。
  表16-2. 作业的输入
  

  表16-3. map输出
  
  ▲
  表16-4. reduce输出
  
  ▲
  统计音频使用总数
  Sum作业相对简单,它只为每个音轨累计我们感兴趣的数据。
  SumMapper 输入数据仍然是原始文本文件,但是这一阶段对输入数据的处理完全不同。期望的输出结果是针对每个音轨的一系列累计值(不同用户、播放次数、收藏次数、电台收听次数和跳过次数)。为了方便处理,我们使用一个由Hadoop Record I/O类产生的TrackStats中间对象,它实现了WritableComparable方法(因此可被用作输出)来保存这些数据。mapper创建一个TrackStats对象,根据文件中的每一行数据对它进行值的设定,但是“不同的用户数”(unique listener count)这一项没有填写(这项数据由merge作业模块填写)。
  public void map(LongWritable position, Text rawLine,
  OutputCollector output, Reporter reporter)
  throws IOException {
  String[] parts = (rawLine.toString()).split(" ");
  int trackId = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]);
  int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]);
  int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]);
  int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]);
  // set number of listeners to 0 (this is calculated later)
  // and other values as provided in text file
  TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip);
  output.collect(new IntWritable(trackId), trackstat);
  }
  SumReducer 在这一过程,reducer执行和mapper相似的函数——对每个音频使用总数情况进行统计,然后返回一个总的统计数据:
  public void reduce(IntWritable trackId, Iterator values,
  OutputCollector output, Reporter reporter)
  throws IOException {
  TrackStats sum = new TrackStats(); // holds the totals for this track
  while (values.hasNext()) {
  TrackStats trackStats = (TrackStats) values.next();
  sum.setListeners(sum.getListeners() + trackStats.getListeners());
  sum.setPlays(sum.getPlays() + trackStats.getPlays());
  sum.setSkips(sum.getSkips() + trackStats.getSkips());
  sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles());
  sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays());
  }
  output.collect(trackId, sum);
  }
  表16-5是这个部分作业的输入数据(和Unique Listener作业模块的输入一样)。map的输出结果如表16-6所示,reduce的输出结果如表16-7所示。
  表16-5. 作业输入
  
  ▲
  表16-6. map输出
  
  ▲
  表16-7. reduce 输出
  
  ▲
  合并结果
  最后一个作业模块需要合并前面两个作业模块产生的输出数据:每个音频对应的不同用户数和每个音频的使用统计信息。为了能够合并这两种不同的输入数据,我们采用了两个不同的mapper(对每一种输入定义一个)。两个中间作业模块被配置之后可以把他们的输出结果写入路径不同的文件,MultipleInputs类用于指定mapper和文件的对应关系。下面的代码展示了作业的JobConf对象是如何设置来完成这一过程的:
  MultipleInputs.addInputPath(conf, sumInputDir,
  SequenceFileInputFormat.class, IdentityMapper.class);
  MultipleInputs.addInputPath(conf, listenersInputDir,
  SequenceFileInputFormat.class, MergeListenersMapper.class);
  虽然单用一个mapper也能处理不同的输入,但是示范解决方案更方便,更巧妙。
  MergeListenersMapper 这个mapper用来处理UniqueListenerJob输出的每个音轨的不同用户数据。它采用和SumMapper相似的方法创建TrackStats对象,但这次它只填写每个音轨的不同用户数信息,不管其他字段:
  public void map(IntWritable trackId, IntWritable uniqueListenerCount,
  OutputCollector output, Reporter reporter)
  throws IOException {
  TrackStats trackStats = new TrackStats();
  trackStats.setListeners(uniqueListenerCount.get());
  output.collect(trackId, trackStats);
  }
  表16-8是mapper的一些输入数据;表16-9是对应的输出结果。
  表16-8. MergeListenersMapper的输入
  
  ▲
  表16-9. MergeListenersMapper的输出
  
  ▲
  IdentityMapper IdentityMapper被配置用来处理SumJob输出的TrackStats对象,因为不要求对数据进行其他处理,所以它直接输出输入数据(见表16-10)。
  表16-10. IdentityMapper的输入和输出
  
  ▲
  SumReducer 前面两个mapper产生同一类型的数据:每个音轨对应一个TrackStats对象,只是数据赋值不同。最后的reduce阶段能够重用前面描述的SumReducer来为每个音轨创建一个新的TrackStats对象,它综合前面两个TrackStats对象的值,然后输出结果(见表16-11)。
  表16-11. SumReducer的最终输出
  
  ▲
  最终输出文件被收集后复制到服务器端,在这里一个Web服务程序使Last.fm网站能得到并展示这些数据。如图16-3所示,这个网页展示了一个音频的使用统计信息:接听者总数和播放总次数。
  
  ▲图16-3. TrackStats结果
  总结
  Hadoop已经成为Last.fm基础框架的一个重要部件,它用于产生和处理各种各样的数据集,如网页日志信息和用户收听数据。为了让大家能够掌握主要的概念,这里讲述的例子已经被大大地简化;在实际应用中输入数据具有更复杂的结构并且数据处理的代码也更加繁琐。虽然Hadoop本身已经足够成熟可以支持实际应用,但它仍在被大家积极地开发,并且每周Hadoop社区都会为它增加新的特性并提升它的性能。Last.fm很高兴是这个社区的一分子,我们是代码和新想法的贡献者,同时也是对大量开源技术进行利用的终端用户。
  (作者:Adrian Woodhead和Marc de Palol)

 

©版权所有:2012-2014 北京思数科技有限公司
地址:北京海淀区上地软件园; 东直门东方银座; 海淀五道口清华大学东门计算中心; 上海市浦东新区郭守敬路498号浦东软件园; 广州海珠区中山大学国家科技软件园
电话:13911486420 京ICP备13022478号