若谷学院
互联网公司技术架构分享

美团外卖客户端高可用建设体系

美团点评阅读(149)

背景

美团外卖从2013年11月开始起步,经过数年的高速发展,一直在不断地刷新着记录。2018年5月19日,日订单量峰值突破2000万单,已经成为全球规模最大的外卖平台。业务的快速发展对系统稳定性提出了更高的要求,如何为线上用户提供高稳定的服务体验,保障全链路业务和系统高可用运行,不仅需要后端服务支持,更需要在端上提供全面的技术保障。而相对服务端而言,客户端运行环境千差万别,不可控因素多,面对突发问题应急能力差。因此,构建客户端的高可用建设体系,保障服务稳定高可用,不仅是对工程师的技术挑战,也是外卖平台的核心竞争力之一。

高可用建设体系的思路

一个设计良好的大型客户端系统往往是由一系列各自独立的小组共同开发完成的,每一个小组都应当具有明确定义的的职责划分。各业务模块之间推行“松耦合”开发模式,让业务模块拥有隔离式变更的能力,是一种可以同时提升开发灵活性和系统健壮性的有效手段。这是美团外卖整体的业务架构,整体上以商品交易链路(门店召回,商品展示,交易)为核心方向进行建设,局部上依据业务特点和团队分工分成多个可独立运维单元单独维护。可独立运维单元的简单性是可靠性的前提条件,这使得我们能够持续关注功能迭代,不断完成相关的工程开发任务。

图片1

我们将问题依照生命周期划分为三个阶段:发现、定位、解决,围绕这三个阶段的持续建设,构成了美团外卖高可用建设体系的核心。

美团外卖质量保障体系全景图

这是美团外卖客户端整体质量体系全景图。整体思路:监控报警,日志体系,容灾。

图片2

通过采集业务稳定性,基础能力稳定性,性能稳定性三大类指标数据并上报,衡量客户端系统质量的标准得以完善;通过设立基线,应用特定业务模型对这一系列指标进行监控报警,客户端具备了分钟级感知核心链路稳定性的能力;而通过搭建日志体系,整个系统有了提取关键线索能力,多维度快速定位问题。当问题一旦定位,我们就能通过美团外卖的线上运维规范进行容灾操作:降级,切换通道或限流,从而保证整体的核心链路稳定性。

监控&报警

监控系统,处于整个服务可靠度层级模型的最底层,是运维一个可靠的稳定系统必不可少的重要组成部分。为了保障全链路业务和系统高可用运行,需要在用户感知问题之前发现系统中存在的异常,离开了监控系统,我们就没有能力分辨客户端是不是在正常提供服务。

图片3

按照监控的领域方向,可以分成系统监控与业务监控。
系统监控,主要用于基础能力如端到端成功率,服务响应时长,网络流量,硬件性能等相关的监控。系统监控侧重在无业务侵入和定制系统级别的监控,更多侧重在业务应用的底层,多属于单系统级别的监控。
业务监控,侧重在某个时间区间,业务的运行情况分析。业务监控系统构建于系统监控之上,可以基于系统监控的数据指标计算,并基于特定的业务介入,实现多系统之间的数据联合与分析,并根据相应的业务模型,提供实时的业务监控与告警。按照业务监控的时效性,可以继续将其细分成实时业务监控与离线业务监控。

  • 实时业务监控,通过实时的数据采集分析,帮助快速发现及定位线上问题,提供告警机制及介入响应(人工或系统)途径,帮助避免发生系统故障。
  • 离线的业务监控,对一定时间段收集的数据进行数据挖掘、聚合、分析,推断出系统业务可能存在的问题,帮助进行业务上的重新优化或改进的监控。

美团外卖的业务监控,大部分属于实时业务监控。借助美团统一的系统监控建设基础,美团外卖联合公司其他部门将部分监控基础设施进行了改造、共建和整合复用,并打通形成闭环(监控,日志,回捞),我们构建了特定符合外卖业务流程的实时业务监控; 而离线的业务监控,主要通过用户行为的统计与业务数据的挖掘分析,来帮助产品设计,运营策略行为等产生影响,目前这部分监控主要由美团外卖数据组提供服务。值得特别说明的是单纯的信息汇总展示,无需或无法立即做出介入动作的业务监控,可以称之为业务分析,如特定区域的活动消费情况、区域订单数量、特定路径转换率、曝光点击率等,除非这些数据用来决策系统实时状态健康情况,帮助产生系统维护行为,否则这部分监控由离线来处理更合适。

图片4

我们把客户端稳定性指标分为3类维度:业务稳定性指标,基础能力稳定性指标,性能稳定性指标。对不同的指标,我们采用不同的采集方案进行提取上报,汇总到不同系统;在设定完指标后,我们就可以制定基线,并依照特定的业务模型制定报警策略。美团外卖客户端拥有超过40项度量质量指标,其中25项指标支持分钟级别报警。报警通道依据紧急程度支持邮件,IM和短信三条通道。因此,我们团队具备及时发现影响核心链路稳定性的关键指标变化能力。

一个完善的监控报警系统是非常复杂的,因此在设计时一定要追求简化。以下是《Site Reliability Engineering: How Google Runs Production Systems》一书中提到的告警设置原则:

最能反映真实故障的规则应该可预测性强,非常可靠,并且越简单越好
不常用的数据采集,汇总以及告警配置应该定时清除(某些SRE团队的标准是一季度未使用即删除)
没有暴露给任何监控后台、告警规则的采集数据指标应该定时清除

通过监控&报警系统,2017年下半年美团外卖客户端团队共发现影响核心链路稳定性超过20起问题:包括爬虫、流量、运营商403问题、性能问题等。目前,所有问题均已全部改造完毕。

日志体系

监控系统的一个重要特征是生产紧急告警。一旦出现故障,需要有人来调查这项告警,以决定目前是否存在真实故障,是否需要采取特定方法缓解故障,直至查出导致故障的问题根源。

简单定位和深入调试的过程必须要保持非常简单,必须能够被团队中任何一个人所理解。日志体系,在简化这一过程中起到了决定性作用。

图片5

美团外卖的日志体系总体分为3大类:即全量日志系统,个体日志系统,异常日志系统。全量日志系统,主要负责采集整体性指标,如网络可用性,埋点可用性,我们可以通过他了解到系统整体大盘,了解整体波动,确定问题影响范围;异常日志系统,主要采集异常指标,如大图问题,分享失败,定位失败等,我们通过他可以迅速获取异常上下文信息,分析解决问题;而个体日志系统,则用于提取个体用户的关键信息,从而针对性的分析特定客诉问题。这三类日志,构成了完整的客户端日志体系。

图片2

日志的一个典型使用场景是处理单点客诉问题,解决系统潜在隐患。个体日志系统,用于简化工程师提取关键线索步骤,提升定位分析问题效率。在这一领域,美团外卖使用的是点评平台开发的Logan服务。作为美团移动端底层的基础日志库,Logan接入了集团众多日志系统,例如端到端日志、用户行为日志、代码级日志、崩溃日志等,并且这些日志全部都是本地存储,且有多重加密机制和严格的权限审核机制,在处理用户客诉时才对数据进行回捞和分析,保证用户隐私安全。

通过设计和实施美团外卖核心链路日志方案,我们打通了用户交易流程中各系统如订单,用户中心,Crash平台与Push后台之间的底层数据同步;通过输出标准问题分析手册,针对常见个体问题的分析和处理得以标准化;通过制定日志捞取SOP并定期演练,线上追溯能力大幅提升,日常客诉绝大部分可在30分钟内定位原因。在这一过程中,通过个体暴露出影响核心链路稳定性的问题也均已全部改进/修复。

故障排查是运维大型系统的一项关键技能。采用系统化的工具和手段而不仅仅依靠经验甚至运气,这项技能是可以自我学习,也可以内部进行传授。

容灾备份

针对不同级别的服务,应该采取不同的手段进行有效止损。非核心依赖,通过降级向用户提供可伸缩的服务;而核心依赖,采用多通道方式进行依赖备份容灾保证交易路径链路的高可用;异常流量,通过多维度限流,最大限度保证业务可用性的同时,给予用户良好的体验。总结成三点,即:非核心依赖降级、核心依赖备份、过载保护限流。接下来我们分别来阐述这三方面。

降级

图片6

在这里选取美团外卖客户端整体系统结构关系图来介绍非核心依赖降级建设概览。图上中间红色部分是核心关键节点,即外卖业务的核心链路:定位,商家召回,商品展示,下单;蓝色部分,是核心链路依赖的关键服务;黄色部分,是可降级服务。我们通过梳理依赖关系,改造前后端通讯协议,实现了客户端非核心依赖可降级;而后端服务,通过各级缓存,屏蔽隔离策略,实现了业务模块内部可降级,业务之间可降级。这构成了美团外卖客户端整体的降级体系。

右边则是美团外卖客户端业务/技术降级开关流程图。通过推拉结合,缓存更新策略,我们能够分钟级别同步降级配置,快速止损。

目前,美团外卖客户端有超过20项业务/能力支持降级。通过有效降级,我们避开了1次S2级事故,多次S3、S4级事故。此外,降级开关整体方案产出SDK horn,推广至集团酒旅、金融等其他核心业务应用。

备份

核心依赖备份建设上,在此重点介绍美团外卖多网络通道。网络通道,作为客户端的最核心依赖,却是整个全链路体系最不可控的部分,经常出现问题:网络劫持,运营商故障,甚至光纤被物理挖断等大大小小的故障严重影响了核心链路的稳定性。因此,治理网络问题,必须要建设可靠的多通道备份。

图片7

这是美团外卖多网络通道备份示意图。美团外卖客户端拥有Shark、HTTP、HTTPS、HTTP DNS等四条网络通道:整体网络以Shark长连通道为主通道,其余三条通道作为备份通道。配合完备的开关切换流程,可以在网络指标发生骤降时,实现分钟级别的分城市网络通道切换。而通过制定故障应急SOP并不断演练,提升了我们解决问题的能力和速度,有效应对各类网络异常。我们的网络通道开关思路也输出至集团其他部门,有效支持了业务发展。

限流

服务过载是另一类典型的事故。究其原因大部分情况下都是由于少数调用方调用的少数接口性能很差,导致对应服务的性能恶化。若调用端缺乏有效降级容错,在某些正常情况下能够降低错误率的手段,如请求失败后重试,反而会让服务进一步性能恶化,甚至影响本来正常的服务调用。

美团外卖业务在高峰期订单量已达到了相当高的规模量级,业务系统也及其复杂。根据以往经验,在业务高峰期,一旦出现异常流量疯狂增长从而导致服务器宕机,则损失不可估量。

因此,美团外卖前后端联合开发了一套“流量控制系统”,对流量实施实时控制。既能日常保证业务系统稳定运转,也能在业务系统出现问题的时候提供一套优雅的降级方案,最大限度保证业务的可用性,在将损失降到最低的前提下,给予用户良好的体验。

图片8

整套系统,后端服务负责识别打标分类,通过统一的协议告诉前端所标识类别;而前端,通过多级流控检查,对不同流量进行区分处理:弹验证码,或排队等待,或直接处理,或直接丢弃。
面对不同场景,系统支持多级流控方案,可有效拦截系统过载流量,防止系统雪崩。此外,整套系统拥有分接口流控监控能力,可对流控效果进行监控,及时发现系统异常。整套方案在数次异常流量增长的故障中,经受住了考验。

发布

随着外卖业务的发展,美团外卖的用户量和订单量已经达到了相当的量级,在线直接全量发布版本/功能影响范围大,风险高。
版本灰度和功能灰度是一种能够平滑过渡的发布方式:即在线上进行A/B实验,让一部分用户继续使用产品(特性)A,另一部分用户开始使用产品(特性)B。如果各项指标平稳正常,结果符合预期,则扩大范围,将所有用户都迁移到B上来,否则回滚。灰度发布可以保证系统的稳定,在初试阶段就可以发现问题,修复问题,调整策略,保证影响范围不被扩散。

美团外卖客户端在版本灰度及功能灰度已较为完善。
版本灰度 iOS采用苹果官方提供的分阶段发布方式,Android则采用美团自研的EVA包管理后台进行发布。这两类发布均支持逐步放量的分发方式。
功能灰度 功能发布开关配置系统依据用户特征维度(如城市,用户ID)发布,并且整个配置系统有测试和线上两套不同环境,配合固定的上线窗口,保证上线的规范性。
对应的,相应的监控基础设施也支持分用户特征维度(如城市,用户ID)监控,避免了那些无法在整体大盘体现的灰度异常。此外,无论版本灰度或功能灰度,我们均有相应最小灰度周期和回滚机制,保证整个灰度发布过程可控,最小化问题影响。

线上运维

图片9

在故障来临时如何应对,是整个质量保障体系中最关键的环节。没有人天生就能完美的处理紧急情况,面对问题,恰当的处理需要平时不断的演练。
围绕问题的生命周期,即发现、定位、解决(预防),美团外卖客户端团队组建了一套完备的处理流程和规范来应对影响链路稳定性的各类线上问题。整体思路:建立规范,提前建设,有效应对,事后总结。在不同阶段用不同方式解决不同问题,事前确定完整的事故流程管理策略,并确保平稳实施,经常演练,问题的平均恢复时间大大降低,美团外卖核心链路的高稳定性才能够得以保障。

未来展望

当前美团外卖业务仍然处于快速增长期。伴随着业务的发展,背后支持业务的技术系统也日趋复杂。在美团外卖客户端高可用体系建设过程中,我们希望能够通过一套智能化运维系统,帮助工程师快速、准确的识别核心链路各子系统异常,发现问题根源,并自动执行对应的异常解决预案,进一步缩短服务恢复时间,从而避免或减少线上事故影响。

诚然,业界关于自动化运维的探索有很多,但多数都集中在后台服务领域,前端方向成果较少。我们外卖技术团队目前也在同步的探索中,正处于基础性建设阶段,欢迎更多业界同行跟我们一起讨论、切磋。

参考资料

  1. Site Reliability Engineering: How Google Runs Production Systems
  2. 美团点评移动端基础日志库——Logan
  3. 美团点评移动网络优化实践

作者简介

陈航,美团高级技术专家。2015年加入美团,目前负责美团外卖iOS团队,对移动端架构演进,监控报警备份容灾,移动端线上运维等领域有深刻理解。

富强,美团资深工程师。2015年加入美团,是外卖iOS的早期开发者之一,目前作为美团外卖iOS基础设施小组负责人,负责外卖基础设施及广告运营相关业务。

徐宏,美团高级工程师。2016年加入美团,目前作为外卖iOS团队主力开发,负责移动端APM性能监控,高可用基础设施支撑相关推进工作。

招聘

美团外卖长期招聘iOS、Android、FE高级/资深工程师和技术专家,可Base在北京、上海、成都,欢迎有兴趣的同学将简历发送至chenhang03#meituan.com。

原文出自:https://tech.meituan.com/archives

大众点评账号业务高可用进阶之路

美团点评阅读(142)

引言

在任何一家互联网公司,不管其主营业务是什么,都会有一套自己的账号体系。账号既是公司所有业务发展留下的最宝贵资产,它可以用来衡量业务指标,例如日活、月活、留存等,同时也给不同业务线提供了大量潜在用户,业务可以基于账号来做用户画像,制定各自的发展路径。因此,账号服务的重要性不言而喻,同时美团业务飞速发展,对账号业务的可用性要求也越来越高。本文将分享一些我们在高可用探索中的实践。

衡量一个系统的可用性有两个指标:1. MTBF (Mean Time Between Failure) 即平均多长时间不出故障;2. MTTR (Mean Time To Recovery) 即出故障后的平均恢复时间。通过这两个指标可以计算出可用性,也就是我们大家比较熟悉的“几个9”。

可用性公式

因此提升系统的可用性,就得从这两个指标入手,要么降低故障恢复的时间,要么延长不出故障的时间。

1. 业务监控

要降低故障恢复的时间,首先得尽早的发现故障,然后才能解决故障,这些故障包括系统内和系统外的,这就需要依赖业务监控系统。

业务监控不同于其他监控系统,业务监控关注的是各个业务指标是否正常,比如账号的登录曲线。大众点评登录入口有很多,从终端上分有App、PC、M站,从登录类型上分有密码登录、快捷登录、第三方登录(微信/QQ/微博)、小程序登录等。需要监控的维度有登录总数、成功数、失败分类、用户地区、App版本号、浏览器类型、登录来源Referer、服务所在机房等等。业务监控最能从直观上告诉我们系统的运行状况。

由于业务监控的维度很多很杂,有时还要增加新的监控维度,并且告警分析需要频繁聚合不同维度的数据,因此我们采用ElasticSearch作为日志存储。整体架构如下图:

每条监控都会根据过去的业务曲线计算出一条基线(见下图),用来跟当前数据做对比,超出设定的阈值后就会触发告警。

监控曲线图

每次收到告警,我们都要去找出背后的原因,如果是流量涨了,是有活动了还是被刷了?如果流量跌了,是日志延时了还是服务出问题了?另外值得重视的是告警的频次,如果告警太多就会稀释大家的警惕性。我们曾经踩过一次坑,因为告警太多就把告警关了,结果就在关告警的这段时间业务出问题了,我们没有及时发现。为了提高每条告警的定位速度,我们在每条告警后面加上维度分析。如下图(非真实数据),告警里直接给出分析结果。

监控告警分析图

其实业务监控也从侧面反映出一个系统的可用性,所谓服务未动,监控先行。

2. 柔性可用

柔性可用的目的是延长不出故障的时间,当业务依赖的下游服务出故障时不影响自身的核心功能或服务。账号对上层业务提供的鉴权和查询服务即核心服务,这些服务的QPS非常高,业务方对它们的可用性要求也很高,别说是服务故障,就连任何一点抖动都是不能接受的。对此我们先从整体架构上把服务拆分,其次在服务内对下游依赖做资源隔离,都尽可能的缩小故障发生时的影响范围。

另外对非关键路径上的服务故障做了降级。例如账号的一个查询服务依赖Redis,当Redis抖动的时候服务的可用性也随之降低,我们通过公司内部另外一套缓存中间件Cellar来做Redis的备用存储,当检测到Redis已经非常不可用时就切到Cellar上。通过开源组件Hystrix或者我们公司自研的中间件Rhino就能非常方便地解决这类问题,其原理是根据最近一个时间窗口内的失败率来预测下一个请求需不需要快速失败,从而自动降级,这些步骤都能在毫秒级完成,相比人工干预的情况提升几个数量级,因此系统的可用性也会大幅提高。下图是优化前后的对比图,可以非常明显的看到,系统的容错能力提升了,TP999也能控制在合理范围内。

性能对比图前

性能对比图后

对于关键路径上的服务故障我们可以减少其影响的用户数。比如手机快捷登录流程里的某个关键服务挂了,我们可以在返回的失败文案上做优化,并且在登录入口挂小黄条提示,让用户主动去其他登录途径,这样对于那些设置过密码或者绑定了第三方的用户还有其他选择。

具体的做法是我们在每个登录入口都关联了一个计数器,一旦其中的关键节点不可用,就会在受影响的计数器上加1,如果节点恢复,则会减1,每个计数器还分别对应一个标志位,当计数器大于0时,标志位为1,否则标志位为0。我们可以根据当前标志位的值得知登录入口的可用情况,从而在登录页展示不同的提示文案,这些提示文案一共有2^5=32种。

登录降级流程图

下图是我们在做故障模拟时的降级提示文案:

3. 异地多活

除了柔性可用,还有一种思路可以来延长不出故障的时间,那就是做冗余,冗余的越多,系统的故障率就越低,并且是呈指数级降低。不管是机房故障,还是存储故障,甚至是网络故障,都能依赖冗余去解决,比如数据库可以通过增加从库的方式做冗余,服务层可以通过分布式架构做冗余,但是冗余也会带来新的问题,比如成本翻倍,复杂性增加,这就要衡量投入产出比。

目前美团的数据中心机房主要在北京上海,各个业务都直接或间接的依赖账号服务,尽管公司内已有北上专线,但因为专线故障或抖动引发的账号服务不可用,间接导致的业务损失也不容忽视,我们就开始考虑做跨城的异地冗余,即异地多活。

3.1 方案设计

首先我们调研了业界比较成熟的做法,主流思路是分set化,优点是非常利于扩展,缺点是只能按一个维度划分。比如按用户ID取模划分set,其他的像手机号和邮箱的维度就要做出妥协,尤其是这些维度还有唯一性要求,这就使得数据同步或者修改都增加了复杂度,而且极易出错,给后续维护带来困难。考虑到账号读多写少的特性(读写比是350:1),我们采用了一主多从的数据库部署方案,优先解决读多活的问题。

Redis如果也用一主多从的模式可行吗?答案是不行,因为Redis主从同步机制会优先尝试增量同步,当增量同步不成功时,再去尝试全量同步,一旦专线发生抖动就会把主库拖垮,并进一步阻塞专线,形成“雪崩效应”。因此两地的Redis只能是双主模式,但是这种架构有一个问题,就是我们得自己去解决数据同步的问题,除了保证数据不丢,还要保证数据一致。

另外从用户进来的每一层路由都要是就近的,因此DNS需要开启智能解析,SLB要开启同城策略,RPC已默认就近访问。

总体上账号的异地多活遵循以下三个原则:
1.北上任何一地故障,另一地都可提供完整服务。
2.北上两地同时对外提供服务,确保服务随时可用。
3.两地服务都遵循BASE原则,确保数据最终一致。

最终设计方案如下:

异地多活架构图

3.2 数据同步

首先要保证数据在传输的过程中不能丢,因此需要一个可靠接收数据的地方,于是我们采用了公司内部的MQ平台Mafka(类Kafka)做数据中转站。可是消息在经过Mafka传递之后可能是乱序的,这导致对同一个key的一串操作序列可能导致不一致的结果,这是不可忍受的。但Mafka只是不保证全局有序,在单个partition内却是有序的,于是我们只要对每个key做一遍一致性散列算法对应一个partitionId,这样就能保证每个key的操作是有序的。

但仅仅有序还不够,两地的并发写仍然会造成数据的不一致。这里涉及到分布式数据的一致性问题,业界有两种普遍的认知,一种是Paxos协议,一种是Raft协议,我们吸取了对实现更为友好的Raft协议,它主张有一个主节点,其余是从节点,并且在主节点不可用时,从节点可晋升为主节点。简单来说就是把这些节点排个序,当写入有冲突时,以排在最前面的那个节点为准,其余节点都去follow那个主节点的值。在技术实现上,我们设计出一个版本号(见下图),实际上是一个long型整数,其中数据源大小即表示节点的顺序,把版本号存入value里面,当两个写入发生冲突的时候只要比较这个版本号的大小即可,版本号大的覆盖小的,这样能保证写冲突时的数据一致性。

异地多活之版本号

写并发时数据同步过程如下图:

写并发时数据同步过程图

这种同步方式的好处显而易见,可以适用于所有的Redis操作且能保证数据的最终一致性。但这也有一些弊端,由于多存了版本号导致Redis存储会增加,另外在该机制下两地的数据其实是全量同步的,这对于那些仅用做缓存的存储来说是非常浪费资源的,因为缓存有数据库可以回源。而账号服务几乎一半的Redis存储都是缓存,因此我们需要对缓存同步做优化。

账号服务的缓存加载与更新模式如下图:

缓存加载与更新模式

我们优化的方向是在缓存加载时不同步,只有在数据库有更新时才去同步。但是数据更新这个流程里不能再使用delete操作,这样做有可能使缓存出现脏数据,比如下面这个例子:

我们对这个问题的解决办法是用set(若key不存在则添加,否则覆盖)代替delete,而缓存的加载用add(若key不存在则添加,否则不修改),这样能保证缓存更新时的强一致性却不需要增加额外存储。考虑到账号修改的入口比较多,我们希望缓存更新的逻辑能拎出来单独处理减少耦合,最后发现公司内部数据同步组件Databus非常适用于该场景,其主要功能是把数据库的变更日志以消息的形式发出来。于是优化后的缓存模式如下图:

从理论变为工程实现的时候还有些需要注意的地方,比如同步消息没发出去、数据收到后写失败了。因此我们还需要一个方法来检测数据不一致的数量,为了做到这点,我们新建了一个定时任务去scan两地的数据做对比统计,如果发现有不一致的还能及时修复掉。

项目上线后,我们也取得一些成果,首先性能提升非常明显,异地的调用平均耗时和TP99、TP999均至少下降80%,并且在一次线上专线故障期间,账号读服务对外的可用性并没有受影响,避免了更大范围的损失。

总结

服务的高可用需要持续性的投入与维护,比如我们会每月做一次容灾演练。高可用也不止体现在某一两个重点项目上,更多的体现在每个业务开发同学的日常工作里。任何一个小Bug都可能引起一次大的故障,让你前期所有的努力都付之东流,因此我们的每一行代码,每一个方案,每一次线上改动都应该是仔细推敲过的。高可用应该成为一种思维方式。最后希望我们能在服务高可用的道路上越走越远。

团队简介

账号团队拥有一群朝气蓬勃的成员:堂堂、德鑫、杨正、可可、徐升、艳豪,虽然他们之中有些刚毕业不久,但技术上都锐意进取,在讨论技术方案时观点鲜明,大家都充分地享受着思想碰撞的火花,这个年轻的团队在一起推进着高可用项目的进行,共同撑起了账号服务的平稳运行及业务发展。

招聘信息

如果你觉得我们的高可用仍有提升空间,欢迎来大众点评基础平台研发组;

如果你想更深入学习高可用的技术细节,欢迎来大众点评基础平台研发组;

如果你想遇到一群志同道合的技术开发,欢迎来大众点评基础平台研发组。

简历传送门:tangtang.sha#dianping.com。

原文出自:https://tech.meituan.com/archives

实时数据产品实践——美团大交通战场沙盘

美团点评阅读(142)

背景

大数据时代,数据的重要性不言而喻,尤其对于互联网公司,随着业务的快速变化,商业模式的不断创新、用户体验个性化、实时化需求日益突出,海量数据实时处理在商业方面的需求越来越大。如何通过数据快速分析出用户的行为,以便做出准确的决策,越来越体现一个公司的价值。现阶段对于实时数据的建设比较单一,主要存在以下问题:

  1. 实时仓库建设不足,维度及指标不够丰富,无法快速满足不同业务需求。
  2. 实时数据和离线数据对比不灵活,无法自动化新增对比基期数据,且对比数据无法预先生产。
  3. 数据监控不及时,一旦数据出现问题而无法及时监控到,就会影响业务分析决策。
    因此,本文将基于美团点评大交通实时数据产品,从面临的挑战、总体解决方案、数据设计架构、后台设计架构等几个方面,详细介绍实时数据系统的整体建设思路。

挑战

实时流数据来源系统较多,处理非常复杂,并且不同业务场景对实时数据的要求不同,因此在建设过程主要有以下挑战:

  1. 如何在保证数据准确性的前提下实现多实时流关联;实时流出现延迟、乱序、重复时如何解决。
    流式计算中通常需要将多个实时流按某些主键进行关联得到特定的实时数据,但不同于离线数据表关联,实时流的到达是一个增量的过程,无法获取实时流的全量数据,并且实时流的达到次序无法确定,因此在进行关联时需要考虑存储一些中间状态及下发策略问题。
  2. 实时流可复用性,实时流的处理不能只为解决一个问题,而是一类甚至几类问题,需要从业务角度对数据进行抽象,分层建设,以快速满足不同场景下对数据的要求。
  3. 中台服务如何保证查询性能、数据预警及数据安全。
    实时数据指标维度较为丰富,多维度聚合查询场景对服务层的性能要求较高,需要服务层能够支持较快的计算能力和响应能力;同时数据出现问题后,需要做好及时监控并快速修复。
  4. 如何保证产品应用需求个性化。
    实时数据与离线数据对比不灵活,需要提供可配置方案,并能够及时生产离线数据。

解决思路

我们在充分梳理业务需求的基础上,重新对实时流进行了建设,将实时数据分层建模,并对外提供统一的接口,保证数据同源同口径;同时,在数据服务层,增加可配置信息模块解决了配置信息不能自动化的问题,在数据处理策略上做了多线程处理、预计算、数据降级等优化,在数据安全方面增加数据审计功能,更好地提升了产品的用户体验。

总体方案

产品整体建设方案基于美团点评技术平台,总共分为源数据层、存储层、服务层及WEB层,整体架构如下所示:


图1 整体架构图

源数据层:主要提供三部分数据,实时数据、离线数据、配置信息、维度信息。
存储层:源数据清洗后放入相应的存储引擎中,为服务层提供数据服务。
服务层:提供三部分功能,数据API服务、预计算服务、权限服务、数据审计服务。
Web层:使用Echarts可视化数据。

数据层

数据架构

依托于美团点评提供的公共资源平台,数据架构按功能分为数据采集、数据处理、数据存储、数据服务四层,如下所示:


图2 数据架构图

数据采集

数据来源主要有两种:业务上报的Log日志及数据库Binlog日志。这些日志通过美团点评日志中心进行采集后存储在消息中间件Kafka中,并按照不同的分类存储在不同的Topic中,供下游订阅。

数据处理

数据处理顾名思义,就是对采集的实时流进行逻辑处理,按业务需求输出对应的实时数据,因此这一步骤是流式计算的关键,分两步进行:数据加工、数据推送。

数据加工:数据加工通常需要在流式计算系统中进行,目前流行的流式处理系统主要有Storm、Spark Streaming系统及Flink系统,这些系统都能在不同的应用场景下发挥很好处理能力,并各有优缺点,如下图所示:

计算框架吞吐量延迟传输保障处理模式成熟度
Storm毫秒级At least once单条处理成熟
Spark Streaming秒级Exactly once微批处理成熟
Flink毫秒级Exactly once单条处理/微批处理新兴

最终我们选择Storm作为实时数据处理框架,并借助公司提供的通用组件来简化拓扑开发流程和重复代码编码。例如,组件MTSimpleLogBolt的主要功能是将Kafka中读取的数据(Log or Binlog)解析成Java实体对象;组件StormConfHelper的功能是获取Storm作业应用配置信息。

数据推送:将处理好的数据推送到存储引擎中。

数据存储

数据加工完成后会被存储到实时存储引擎中,以提供给下游使用。目前常用的存储引擎主要有MySQL、Druid、Elasticsearch、Redis、Tair比较如下:

存储引擎优点缺点
MySQL使用简单,支持数据量小数据量大,对MySQL的压力大,查询性能慢
Druid数据预计算不支持精确查询
Elasticsearch查询效率快,支持常用聚合操作;可以做到精确去重查询条件受限
Redis内存存储KV,查询效率高写入资源有限,不支持大数据量写入
Tair持久化和非持久化两种缓存,查询效率高单节点性能比Redis较弱
Kylin多维查询预计算不支持实时

综上比较,由于实时数据量较大,且数据精度要求较高,因此我们最终选择交易存储使用ES,流量存储使用Druid,维度存储使用Tair,中间数据存储使用Redis;而离线数据,我们采用Hive和Kylin存储。

数据服务

将存储引擎数据统一对外提供查询服务,支持不同业务应用场景。

具体实现

实时流处理流程

整个数据层架构上主要分为实时数据和离线数据两部分:实时数据分为交易的Binlog日志和流量的Log日志,经过Strom框架处理后写入Kafka,再经过DataLinkStreaming分别写入ES和Druid;离线数据通过Hive处理写入Kylin。


图3 产品数据架构

下图所示为一条消息的处理流程:


图4 数据关系

两个Topic分别是order_base(主要存放订单基本信息:订单id、订单状态、支付时间、票量、金额等);order_biz(主要存放订单的扩展信息:订单id、订单类型、出发时间、到达时间、出发城市、到达城市)。我们最终要拿到一条包括上述全部内容的一条记录。


图5 数据处理流程

具体例子:Bolt在处理一条记录时,首先判断这条记录是base还是biz,如果是base则写入缓存中base的Category中,如果是biz则写入biz的Category中。以order_id为Key,如果是base则去和biz关联,如果biz存在则代表能够关联上,这时发送关联后的完整数据,同时删除该主键(order_key)记录;如果biz中不存在,则说明没关联上,这时可能biz的数据延迟或者是丢失,为了保证主数据的准确性,这时我们只发送base的数据,缓存中的数据保留不被删除。如果这条消息是biz,则首先会更新缓存中该主键的biz记录,然后去和base关联,关联上则发送同时删除base中数据,否则不发送。此时我们会根据ES的Update特性去更新之前的数据。从现实效果来看保证了99.2%的数据完整性,符合预期。

数据写入

在Topic2es的数据推送中,通过DataLinkString工具(底层Spark Streaming)实现了Kafka2es的微批次同步,一方面通过多并发batch写入ES获得了良好的吞吐,另一方面提供了5秒的实时写入效率,保证了ES查询的实时可见。同时我们也维护了Kafka的Offset,可以提供At lease once的同步服务,并结合ES的主键,可以做到Exactly once,有效解决了数据重复问题。

ES索引设计及优化

在数据写入ES过程中,由于数据量大,索引时间区间长,在建设索引时需要考虑合理设计保证查询效率,因此主要有以下三点优化:

  • 写入优化 在通过DataLinkString写入ES时,在集群可接受的范围内,数据Shuffle后再分组,增加Client并发数,提升写入效率。
  • 数据结构化 根据需要设计了索引的模版,使用了最小的足够用的数据类型。
  • 按天建索引 通过模版按天建索引,避免影响磁盘IO效率,同时通过别名兼容搜索一致性。
  • 设置合理的分片和副本数 如果分片数过少或过多都会导致检索比较慢。分片数过多会导致检索时打开比较多的文件,另外也会影响多台服务器之间通讯。而分片数过少为导至单个分片索引过大,所以检索速度慢。在确定分片数之前需要进行单服务单索引单分片的测试。 我们根据 索引分片数=数据总量/单分片数 设置了合理的分片数。

实时数据仓库模型

整个实时数据开发遵循大交通实时数仓的分层设计,在此也做一下简单介绍,实时数仓架构如下:

图6 实时数仓架构

ODS层:包含美团页面流量日志、模块事件日志以及用户操作的Binlog信息日志,是直接从业务系统采集过来的原始数据。
事实明细层:根据主题和业务过程,生成订单事实和流量事实。
汇总层:对明细层的数据扩展业务常用的维度信息,形成主题宽表。
App层:针对不同应用在汇总层基础上加工扩展的聚合数据,如火车票在抢票业务下的交易数据汇总信息。

规范建模后,业务需求来临时,只需要在App层建模即可,底层数据统一维护。

中台服务层

后台服务主要实现 登陆验证和权限验证(UPM)、指标逻辑计算和API、预计算服务、数据质量监控、数据审计功能。由于数据量大且实时性要求较高,在实现过程遇到如下挑战:

  • 如何保证查询响应性能。
  • 服务发生故障后,数据降级方案。
  • 数据监控预警方案及数据出现问题解决方案。

针对以上问题,下面进行一一详述:

性能响应优化

服务层处理数据过程中,由于数据量大,在查询时需要一定的响应时间,所以在保证响应性能方面,主要做了以下优化:


图7 性能响应优化

  1. 项目初始由于数据量不是很大,采用单线程直接查询ES,能够满足需求。
  2. 随着节假日来临,数据量大增,并行查询人次增多,查询响应变慢,无法快速响应结果,因此引入缓存技术,将中间结果进行缓存,并在缓存有效期内,直接读取缓存数据大大提高了时间效率;并且在此基础上,引入Master-Worker多线程模式,将多指标查询拆分,并行查询ES,使得查询响应大大提高。
  3. 虽然问题得到解决,但仍存在一个问题,就是每次都是现查ES及部分中间缓存结果,尤其是第一次查询,需要完全走ES,这样就会让第一个查询数据的用户体验较差,因此引入预计算服务,通过定时调度任务,将部分重要维度下的指标进行预计算放入缓存,用户查询时直接读取缓存数据。而一些不常用的维度下的数据,采用的策略是,第一个用户查询时现查ES,并将结果数据预加载到缓存,后续所有用户再次查询直接读缓存数据,这样既能保证用户体验,也不至于占用太多缓存空间。

数据降级方案

使用缓存避免不了出现一些问题,比如缓存失效、缓存雪崩等问题,针对缓存雪崩问题,通过设置不同Key的过期时间能够很好的解决;而对于缓存数据失效,我们有自己的数据降级方案,具体方案如下:


图8 数据降级方案

预计算数据会分别在Redis、Tair和本地缓存中存储一份以保证查询效率,当查询Redis数据不存在时,会去Tair中读取数据,Tair也为空时,会读取本地缓存,只有当本地缓存数据也为空时,才会现查ES做聚合计算,这样也会降低ES的查询压力。

数据监控

实时监控预警非常重要,在数据出现问题时,一方面能够及时通知我们快速定位修复数据,另一方面也能够及时周知业务同学,避免做出错误分析。基于此,我们做了两方面的实时监控,其一是对源实时流在Storm处理层面的监控,确保源实时流正确生产;其二是对展示的汇总数据进行监控,确保产品展示指标数据正常。
针对数据出现问题预警,我们在解决方案上规范了流程:

  1. 监控报警机制及时周知相关同学。
  2. 第一时间通过产品上方的黄条提示用户哪些数据异常。
  3. 快速定位问题,给出修复方案。
    目前对于实时异常数据的修补,主要有两种方法:
    a. 针对特殊情况的数据修补方案第一灵活指定Offset,重新消费Kafka数据。
    b. 预留了Hive2es的准实时重导功能,确保生产数据的准确和完整。

数据安全

在以数据取胜的时代,数据的安全不言而喻,我们采用公司提供的UPM权限接口进行二级权限管理并加入审计功能及水印功能,能够准确记录用户的所有访问以及操作记录,并且将日志数据格式化到数据库中,进行实时监控分析。

总结

实时数据可以为业务特定场景分析决策提供巨大支持,尤其对于大交通节假日及春运期间。在大交通实时战场沙盘产品化过程中,我们投入了大量的思考和实践,主要取得以下收益:

  1. 可视化的产品,为业务方实时分析提供极大便利,取得较好的反馈。
  2. 优化实时数据仓库建设,合理分层建模,规范命名设计,统一维度建设和指标口径,对外提供统一接口,保证数据规范准确。
  3. 在Storm框架下实时开发和数据写入方面积累了一定的经验。
  4. 服务层支持可配置信息,可以灵活配置个性化信息。
  5. 服务层性能及获取数据策略的优化,为用户带来更好的产品体验。

加入我们

最后插播一个招聘广告,我们是一群擅长大数据领域数据建设、数仓建设、数据治理及数据BI应用建设的工程师,期待更多能手加入,感兴趣的可以投递个人简历到邮箱:yangguang09#meituan.com,欢迎您的加入。

作者介绍

娣娣,美团点评数据开发工程师,2015年加入美团,从事数据仓库建设、大数据产品开发工作。
晓磊,美团点评数据开发工程师,2017年加入美团,从事数据仓库建设、大数据产品开发工作。

原文出自:https://tech.meituan.com/archives

分布式块存储系统Ursa的设计与实现

美团点评阅读(256)

引言

云硬盘对IaaS云计算平台有至关重要的作用,几乎已成为必备组件,如亚马逊的EBS(Elastic Block Store)、阿里云的盘古、OpenStack中的Cinder等。云硬盘可为云计算平台带来许多优良特性,如更高的数据可靠性和可用性、灵活的数据快照功能、更好的虚拟机动态迁移支持、更短的主机故障恢复时间等等。随着万兆以太网逐渐普及,云硬盘的各项优势得到加强和凸显,其必要性变得十分强烈。

云硬盘的底层通常是分布式块存储系统,目前开源领域有一些此类项目,如Ceph RBD、Sheepdog。另外MooseFS和GlusterFS虽然叫做文件系统,但由于其特性与块存储系统接近,也能用于支持云硬盘。我们在测评中发现,这些开源项目均存在一些问题,使得它们都难以直接应用在大规模的生产系统当中。例如Ceph RBD的效率较低(CPU使用过高);Sheepdog在压力测试中出现了数据丢失的现象;MooseFS的POSIX语义支持、基于FUSE的架构、不完全开源的2.0版本等问题给它自身带来了许多的局限性;GlusterFS与Ceph同属红帽收购的开源存储系统,主要用于scale-out文件存储场景,在云计算领域使用不多。此外,这些存储系统都难以充发挥用万兆网卡和SSD的性能潜力,难以在未来承担重任。

由于以上原因,美团云研发了全新的分布式块存储系统Ursa,通过简单稳固的系统架构、高效的代码实现以及对各种非典型场景的仔细考虑,实现了高可靠、高可用、高性能、低开销、可扩展、易运维、易维护等等目标。Ursa的名字源于DotA中的熊战士,他具有极高的攻击速度、攻击力和生命值,分别隐喻存储系统中的IOPS、吞吐率和稳定性。

分布式块存储相关项目与技术

2.1 Ceph

(主要参考:https://www.ustack.com/blog/ceph_infra/

Ceph项目起源于其创始人Sage Weil在加州大学Santa Cruz分校攻读博士期间的研究课题。项目的起始时间为2004年。在2006年的OSDI学术会议上,Sage发表了关于Ceph的论文,并提供了项目的下载链接,由此开始广为人知。2010年Ceph客户端部分代码正式进入Linux kernel 2.6.34。

Ceph同时提供对象、块和文件这三个层次的分布式存储服务,其中只有块层存储与我们相关。由于块存储在IaaS云计算系统中占有重要地位,Ceph在近些年的关注度得到显著提高。许多云计算系统实例基于Ceph提供块存储服务,如UnitedStack、Mirantis OpenStack等。

Ceph性能测试

  • 测试版本:0.81
  • 操作系统:CentOS 6.x
  • 测试工具:fio
  • 服务器配置:
    • CPU: Intel Xeon E5-2650v2 @ 2.6GHz
    • RAM: 96GB
    • NIC: 10 GbE
    • HDD: 6 NL SAS, 7200 RPM
    • RAID Controller: Dell H710p (LSI 2208 with 1GB NVRAM)
  • 服务器数量:4,其中一个为兼职客户端

注意:由于客户端位于一个存储服务器上,所以有1/4的吞吐率不经过网卡。

测试结果如下:

  • 读IOPS:16 407(此时客户端CPU占用率超过500%,5台服务器CPU总使用率接近500%)
  • 写IOPS:941
  • 顺序读吞吐率:218 859 KB/s
  • 顺序写吞吐率:67 242 KB/s
  • 顺序读延迟:1.6ms (664 IOPS)
  • 顺序写延迟:4.4ms (225 IOPS)
  • 网络ping值:0.1324ms
  • 本地硬盘顺序读写延迟:0.03332ms (29 126 IOPS)

从测试来看,Ceph的读吞吐率正常,然而写吞吐率不足读的1/3,性能偏低;读写延迟比显著大于网络延迟与磁盘I/O延迟之和;CPU占用率过高。

2.2 Sheepdog

(主要参考:http://peterylh.blog.163.com/blog/static/12033201221594937257/

Sheepdog是由日本NTT实验室的Morita Kazutaka专为虚拟化平台创立的分布式块存储开源项目, 于2009年开源[1]。从2011年9月开始, 一些淘宝的工程师加入了Sheepdog项目,以及相关开源项目比如Corosync、Accord的开发。Sheepdog主要由两部分组成:集群管理和存储服务,其中集群管理目前使用Corosync或者Zookper来完成,存储服务是全新实现的。

Sheepdog采用无中心节点的全对称架构,基于一致性哈希实现从ObjectID到存储节点的定位:每个节点划分成多个虚拟节点,虚拟节点和ObjectID一样,采用64位整数唯一标识,每个虚拟节点负责一段包含节点ID在内的ObjectID区间。DataObject副本存在ObjectID对应的虚拟节点,及在后续的几个节点上。Sheepdog无单点故障问题,存储容量和性能均可线性扩展,新增节点通过简单配置即可加入集群,并且Sheepdog自动实现负载均衡,节点故障时可自动发现并进行副本修复,还直接支持QEMU/KVM。

Sheepdog的服务进程既承担数据服务的职责,同时也是客户端(QEMU)访问数据的gateway。QEMU的Sheepdog driver将对volume的请求转换成为对object的请求,然后通过UNIX domain socket或者TCP socket连接一个Sheepdog服务进程,并将访问请求发给该进程来完成后续步骤。Sheepdog的服务进程还可以开启数据缓存功能,以减少网络I/O。Sheepdog的I/O路径是“client<->gateway<->object manager(s)”,读操作可以在任意副本完成,更新操作并行的发往所有副本, 当所有副本都更新成功之后,gateway才告诉客户端更新操作成功。

Sheepdog的数据可靠性问题

我们对Sheepdog开展了可靠性、可用性测试。在测试中有共3台服务器,每台配有6个机械硬盘,配置好Sheepdog之后,每台服务器启动10个VM,每个VM内无限循环运行fio分别执行小块随机读、写和大块顺序读、写的测试。

在执行压力测试一周后,对集群中的全部数据进行一致性检测(collie cluster check),发现有些数据块副本与另外2个不一致(“fixed replica …”),有些数据块的3个各不相同(“no majority of …”):

[root@node3-10gtest ~]# collie cluster check
fix vdi test1-3
99.9 % [=================================================================>] 50 GB / 50 GB 
fixed replica 3e563000000fca
99.9 % [=================================================================>] 50 GB / 50 GB      
fixed replica 3e563000000fec
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e5630000026f5
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000002da6
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001e8c
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001530
...
fix vdi test2-9
50.9 % [=================================>                                ] 25 GB / 50 GB      
no majority of d781e300000123
51.0 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e300000159
51.2 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e30000018a
53.2 % [====================================>                             ] 27 GB / 50 GB      
…

2.3 MooseFS

(主要参考:http://peterylh.blog.163.com/blog/static/12033201251791139592/

MooseFS是容错的分布式文件系统,通过FUSE支持标准POSIX文件系统接口。 MooseFS的架构类似于GFS,由四个部分组成:

  • 管理服务器Master:类似于GFS的Master,主要有两个功能:(1)存储文件和目录元数据,文件元数据包括文件大小、属性、对应的Chunk等;(2)管理集群成员关系和Chunk元数据信息,包括Chunk存储、版本、Lease等。
  • 元数据备份服务器Metalogger Server:根据元数据文件和log实时备份Master元数据。
  • 存储服务器Chunk Server:负责存储Chunk,提供Chunk读写能力。Chunk文件默认为64MB大小。
  • 客户端Client:以FUSE方式挂到本地文件系统,实现标准文件系统接口。

MooseFS本地不会缓存Chunk信息, 每次读写操作都会访问Master, Master的压力较大。此外MooseFS写操作流程较长且开销较高。MooseFS支持快照,但是以整个Chunk为单位进行CoW(Copy-on-Write),可能造成响应时间恶化,补救办法是以牺牲系统规模为代价,降低Chunk大小。

MooseFS基于FUSE提供POSIX语义支持,已有应用可以不经修改直接迁移到MooseFS之上,这给应用带来极大的便利。然而FUSE也带来了一些负面作用,比如POSIX语义对于块存储来说并不需要,FUSE会带来额外开销等等。

2.4 GFS/HDFS

(主要参考:http://www.nosqlnotes.net/archives/119

HDFS基本可以认为是GFS的一个简化开源实现,二者因此有很多相似之处。首先,GFS和HDFS都采用单一主控机+多台工作机的模式,由一台主控机(Master)存储系统全部元数据,并实现数据的分布、复制、备份决策,主控机还实现了元数据的checkpoint和操作日志记录及回放功能。工作机存储数据,并根据主控机的指令进行数据存储、数据迁移和数据计算等。其次,GFS和HDFS都通过数据分块和复制(多副本,一般是3)来提供更高的可靠性和更高的性能。当其中一个副本不可用时,系统都提供副本自动复制功能。同时,针对数据读多于写的特点,读服务被分配到多个副本所在机器,提供了系统的整体性能。最后,GFS和HDFS都提供了一个树结构的文件系统,实现了类似与Linux下的文件复制、改名、移动、创建、删除操作以及简单的权限管理等。

然而,GFS和HDFS在关键点的设计上差异很大,HDFS为了规避GFS的复杂度进行了很多简化。例如HDFS不支持并发追加和集群快照,早期HDFS的NameNode(即Master)没原生HA功能。总之,HDFS基本可以认为是GFS的简化版,由于时间及应用场景等各方面的原因对GFS的功能做了一定的简化,大大降低了复杂度。

2.5 HLFS

(主要参考:http://peterylh.blog.163.com/blog/static/120332012226104116710/

HLFS(HDFS Log-structured File System)是一个开源分布式块存储系统,其最大特色是结合了LFS和HDFS。HDFS提供了可靠、随时可扩展的文件服务,而HLFS通过Log-structured技术弥补了HDFS不能随机更新的缺憾。在HLFS中,虚拟磁盘对应一个文件, 文件长度能够超过TB级别,客户端支持Linux和Xen,其中Linux基于NBD实现,Xen基于blktap2实现,客户端通过类POSIX接口libHLFS与服务端通讯。HLFS主要特性包括多副本、动态扩容、故障透明处理和快照。

HLFS性能较低。首先,非原地更新必然导致数据块在物理上非连续存放,因此读I/O比较随机,顺序读性能下降。其次,虽然从单个文件角度看来,写I/O是顺序的,但是在HDFS的Chunk Server服务了多个HLFS文件,因此从它的角度来看,I/O仍然是随机的。第三,写延迟问题,HDFS面向大文件设计,小文件写延时不够优化。第四,垃圾回收的影响,垃圾回收需要读取和写入大量数据,对正常写操作造成较大影响。此外,按照目前实现,相同段上的垃圾回收和读写请求不能并发,垃圾回收算法对正常操作的干扰较大。

2.6 iSCSI、FCoE、AoE、NBD

iSCSI、FCoE、AoE、NBD等都是用来支持通过网络访问块设备的协议,它们都采用C/S架构,无法直接支持分布式块存储系统。

Ursa的设计与实现

分布式块存储系统给虚拟机提供的是虚拟硬盘服务,因而有如下设计目标:

  • 大文件存储:虚拟硬盘实际通常GB级别以上,小于1GB是罕见情况
  • 需要支持随机读写访问,不需支持追加写,需要支持resize
  • 通常情况下,文件由一个进程独占读写访问;数据块可被共享只读访问
  • 高可靠,高可用:任意两个服务器同时出现故障不影响数据的可靠性和可用性
  • 能发挥出新型硬件的性能优势,如万兆网络、SSD
  • 由于应用需求未知,同时需要优化吞吐率和IOPS
  • 高效率:降低资源消耗,就降低了成本

除了上述源于虚拟硬盘的直接需求意外,分布式块存储还需要支持下列功能:

  • 快照:可给一个文件在不同时刻建立多个独立的快照
  • 克隆:可将一个文件或快照在逻辑上复制成独立的多份
  • 精简配置(thin-provisioning):只有存储数据的部分才真正占用空间

3.1 系统架构

分布式存储系统总体架构可以分为有master(元数据服务器)和无master两大类。有master架构在技术上较为简单清晰,但存在单点失效以及潜在的性能瓶颈问题;无master架构可以消除单点失效和性能瓶颈问题,然而在技术上却较为复杂,并且在数据布局方面具有较多局限性。块存储系统对master的压力不大,同时master的单点故障问题可采用一些现有成熟技术解决,因而美团EBS的总体架构使用有master的类型。这一架构与GFS、HDFS、MooseFS等系统的架构属于同一类型。

如图1所示,美团EBS系统包括M、S和C三个部分,分别代表Master、Chunk Server和Client。Master中记录的元数据包括3种:(1)关于volume的信息,如类型、大小、创建时间、包含哪些数据chunk等等;(2)关于chunk的信息,如大小、创建时间、所在位置等;(3)关于Chunk Server的信息,如IP地址、端口、数据存储量、I/O负载、空间剩余量等。这3种信息当中,关于volume的信息是持久化保存的,其余两种均为暂存信息,通过Chunk Server上报。

I/O Stack

在元数据中,关于volume的信息非常重要,必须持久化保存;关于chunk的信息和Chunk Server的信息是时变的,并且是由Chunk Server上报的,因而没必要持久化保存。根据这样的分析,我们将关于volume的信息保存在MySQL当中,其他元数据保存在Redis当中,余下的集群管理功能由Manager完成。Master == Manager + MySQL + Redis,其中MySQL使用双机主从配置,Redis使用官方提供的标准cluster功能。

3.2 CAP取舍

C、A、P分别代表Consistency、Availability和Partition-tolerance。分布式系统很难同时在这三个方面做到很高的保障,通常要在仔细分析应用需求的基础之上对CAP做出取舍。

块存储系统主要用于提供云硬盘服务,每块云硬盘通常只会挂载到1台VM之上,不存在多机器并发读写的情况,因而其典型应用场景对一致性的需求较低。针对这一特性,我们可以在设计上舍C而取AP。

对于多机并行访问云硬盘的使用模式,若数据是只读的则无需额外处理;若数据有写有读,甚至是多写多读,则需要在上层引入分布式锁,来确保数据一致性和完整性。这种使用模式在SAN领域并不少见,其典型应用场景是多机同时挂载一个网络硬盘,并通过集群文件系统(而不是常见的单机文件系统)来协调访问存储空间。集群文件系统内部会使用分布式锁来确保数据操作的正确性,所以我们舍C的设计决策不会影响多机并行访问的使用模式。

3.3 并发模型

并发(不是并行!)模型的选择和设计无法作为实现细节隐藏在局部,它会影响到程序代码的各个部分,从底层到上层。基本的并发模型只有这样几种:事件驱动、多线程、多进程以及较为小众的多协程。

事件驱动模型是一种更接近硬件工作模式的并发模型,具有更高的执行效率,是高性能网络服务的普遍选择。为能充分发挥万兆网络和SSD的性能潜力,我们必须利用多核心并行服务,因而需要选用多线程或者多进程模型。由于多进程模型更加简单,进程天然是故障传播的屏障,这两方面都十分有利于提高软件的健壮性;并且我们也很容易对业务进行横向拆分,做到互相没有依赖,也不需要交互,所以我们选择了多进程模型,与事件驱动一起构成混合模型。

协程在现实中的应用并不多,很多语言/开发生态甚至不支持协程,然而协程在一些特定领域其实具有更好的适用性。比如,QEMU/KVM在磁盘I/O方面的并发执行完全是由协程负责的,即便某些block driver只提供了事件驱动的接口(如Ceph RBD),QEMU/KVM也会自动把它们转化封装成多协程模式。实践表明,在并发I/O领域,多协程模型可以同时在性能和易用性方面取得非常好的效果,所以我们做了跟QEMU/KVM类似的选择——在底层将事件驱动模型转换成了多协程模型,最终形成了“多进程+多协程+事件驱动”的混合并发模型。

3.4 存储结构

如图所示,Ursa中的存储结构与GFS/HDFS相似,存储卷由64MB(可配置)大小的chunk组成。Ursa系统中的数据复制、故障检测与修复均在chunk层次进行。系统中,每3个(可配置)chunk组成一组,互为备份。每2个chunk组构成一个stripe组,实现条带化交错读写,提高单客户端顺序读写性能。Ursa在I/O栈上层添加Cache模块,可将最常用的数据缓存在客户端本地的SSD介质当中,当访问命中缓存时可大大提高性能。

I/O Stack

3.5 写入策略

最常见的写入策略有两种:(1)客户端直接写多个副本到各个Chunk Server,如图(a)所示,Sheepdog采用此种办法;(2)客户端写第一个副本,并将写请求依次传递下去,如图(b)所示。这两种方法各有利弊:前者通常具有较小的写入延迟,但吞吐率最高只能达到网络带宽的1/3;后者的吞吐率可以达到100%网络带宽,却具有较高的写入延迟。

I/O Stack

由于Ursa可能用于支持各种类型的应用,必须同时面向吞吐率和带宽进行优化,所以我们设计采用了一种分叉式的写入策略:如图(c)所示,客户端使用WRITE_REPLICATE请求求将数据写到第一个副本,称为primary,然后由primary负责将数据分别写到其他副本。这样Ursa可以在吞吐率和延迟两方面取得较好的均衡。为确保数据可靠性,写操作会等所有副本的写操作都完成之后才能返回。

3.6 无状态服务

Chunk Server内部几乎不保存状态,通常情况下各个请求之间是完全独立执行的,并且重启Chunk Server不会影响后续请求的执行。这样的Chunk Server不但具有更高的鲁棒性,而且具有更高的扩展性。许多其他网络服务、协议的设计都遵循了无状态的原则。

3.7 模块

如下图所示,Ursa中的I/O功能模块的组织采用decorator模式,即所有模块都实现了IStore抽象接口,其中负责直接与Chunk Server通信的模块属于decorator模式中的concrete component,其余模块均为concrete decorator。所有的decorator都保存数量不等的指向其他模块的指针(IStore指针)。

I/O Stack

在运行时,Ursa的I/O栈层次结构的对象图如下所示。

I/O Stack

3.8 产品界面

I/O Stack

性能实测

如下图所示,测试环境由万兆以太网、1台Client和3台Chunk Server组成,chunk文件存在tmpfs上,即所有写操作不落盘,写到内存为止。选择tmpfs主要是为了避免硬盘的I/O速度的局限性,以便测试Ursa在极限情况下的表现。

I/O Stack

测试环境的网络延迟如下:

I/O Stack

在上述环境中,用fio分别测试了读写操作的吞吐率、IOPS以及I/O延迟,测试参数与结果如下:

I/O Stack

从测试结果可以看出:
(1). Ursa在吞吐率测试中可以轻易接近网卡理论带宽;
(2). Ursa的IOPS性能可以达到SSD的水准;
(3). Ursa的读延迟接近ping值,写操作需要写3个副本,延迟比读高68%。

作为对比,我们在测试Ceph的过程中监测到服务端CPU占用情况如下:

I/O Stack

此时机器上5个存储服务进程ceph-osd共占用123.7%的CPU资源,提供了4 101的读IOPS服务;而Ursa的服务进程只消耗了43%的CPU资源,提供了61 340读IOPS服务,运行效率比Ceph高43倍。在客户端方面,Ceph消耗了500%+的CPU资源,得到了16 407读IOPS;而Ursa只消耗了96%的CPU资源,得到了61 340读IOPS,运行效率比Ceph高21倍

总结与展望

Ursa从零开始动手开发到内部上线只经历了9个月的时间,虽然基本功能、性能都已达到预期,但仍有许多需要进一步开发的地方。一个重要的方向是对SSD的支持。虽然将HDD简单替换为SSD,不修改Ursa的任何代码、配置就可以运行,并取得性能上的显著改善,然而SSD在性能、成本、寿命等方面与HDD差异巨大,Ursa必须做出针对性的优化才能使SSD扬长避短。另一个重要方向是对大量VM同时启动的更好的支持。如果同时有上百台相同的VM从Ursa启动(即系统盘放在Ursa上),会在短时间内有大量读请求访问相同的数据集(启动文件),形成局部热点,此时相关的Chunk Server服务能力会出现不足,导致启动变慢。由于各个VM所需的启动数据基本相同,我们可以采用“一传十,十传百”的方式组织一个应用层组播树overlay网络来进行数据分发,这样可以从容应对热点数据访问问题。随着一步步的完善,相信Ursa将在云平台的运营当中起到越来越重要的作用。

原文出自:https://tech.meituan.com/archives

Spark在美团的实践

美团点评阅读(231)

本文已发表在《程序员》杂志2016年4月期。

前言

美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持。大数据处理渗透到了美团各业务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率。
美团最初的数据处理以Hive SQL为主,底层计算引擎为MapReduce,部分相对复杂的业务会由工程师编写MapReduce程序实现。随着业务的发展,单纯的Hive SQL查询或者MapReduce程序已经越来越难以满足数据处理和分析的需求。
一方面,MapReduce计算模型对多轮迭代的DAG作业支持不给力,每轮迭代都需要将数据落盘,极大地影响了作业执行效率,另外只提供Map和Reduce这两种计算因子,使得用户在实现迭代式计算(比如:机器学习算法)时成本高且效率低。
另一方面,在数据仓库的按天生产中,由于某些原始日志是半结构化或者非结构化数据,因此,对其进行清洗和转换操作时,需要结合SQL查询以及复杂的过程式逻辑处理,这部分工作之前是由Hive SQL结合Python脚本来完成。这种方式存在效率问题,当数据量比较大的时候,流程的运行时间较长,这些ETL流程通常处于比较上游的位置,会直接影响到一系列下游的完成时间以及各种重要数据报表的生成。
基于以上原因,美团在2014年的时候引入了Spark。为了充分利用现有Hadoop集群的资源,我们采用了Spark on Yarn模式,所有的Spark app以及MapReduce作业会通过Yarn统一调度执行。Spark在美团数据平台架构中的位置如图所示:

 离线计算平台架构图

经过近两年的推广和发展,从最开始只有少数团队尝试用Spark解决数据处理、机器学习等问题,到现在已经覆盖了美团各大业务线的各种应用场景。从上游的ETL生产,到下游的SQL查询分析以及机器学习等,Spark正在逐步替代MapReduce作业,成为美团大数据处理的主流计算引擎。目前美团Hadoop集群用户每天提交的Spark作业数和MapReduce作业数比例为4:1,对于一些上游的Hive ETL流程,迁移到Spark之后,在相同的资源使用情况下,作业执行速度提升了十倍,极大地提升了业务方的生产效率。
下面我们将介绍Spark在美团的实践,包括我们基于Spark所做的平台化工作以及Spark在生产环境下的应用案例。其中包含Zeppelin结合的交互式开发平台,也有使用Spark任务完成的ETL数据转换工具,数据挖掘组基于Spark开发了特征平台和数据挖掘平台,另外还有基于Spark的交互式用户行为分析系统以及在SEM投放服务中的应用,以下是详细介绍。

Spark交互式开发平台

在推广如何使用Spark的过程中,我们总结了用户开发应用的主要需求:

  1. 数据调研:在正式开发程序之前,首先需要认识待处理的业务数据,包括:数据格式,类型(若以表结构存储则对应到字段类型)、存储方式、有无脏数据,甚至分析根据业务逻辑实现是否可能存在数据倾斜等等。这个需求十分基础且重要,只有对数据有充分的掌控,才能写出高效的Spark代码;
  2. 代码调试:业务的编码实现很难保证一蹴而就,可能需要不断地调试;如果每次少量的修改,测试代码都需要经过编译、打包、提交线上,会对用户的开发效率影响是非常大的;
  3. 联合开发:对于一整个业务的实现,一般会有多方的协作,这时候需要能有一个方便的代码和执行结果共享的途径,用于分享各自的想法和试验结论。

基于这些需求,我们调研了现有的开源系统,最终选择了Apache的孵化项目Zeppelin,将其作为基于Spark的交互式开发平台。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了数据分析和可视化等功能。

 Zeppelin实例

我们在原生的Zeppelin上增加了用户登陆认证、用户行为日志审计、权限管理以及执行Spark作业资源隔离,打造了一个美团的Spark的交互式开发平台,不同的用户可以在该平台上调研数据、调试程序、共享代码和结论。

集成在Zeppelin的Spark提供了三种解释器:Spark、Pyspark、SQL,分别适用于编写Scala、Python、SQL代码。对于上述的数据调研需求,无论是程序设计之初,还是编码实现过程中,当需要检索数据信息时,通过Zeppelin提供的SQL接口可以很便利的获取到分析结果;另外,Zeppelin中Scala和Python解释器自身的交互式特性满足了用户对Spark和Pyspark分步调试的需求,同时由于Zeppelin可以直接连接线上集群,因此可以满足用户对线上数据的读写处理请求;最后,Zeppelin使用Web Socket通信,用户只需要简单地发送要分享内容所在的http链接,所有接受者就可以同步感知代码修改,运行结果等,实现多个开发者协同工作。

Spark作业ETL模板

除了提供平台化的工具以外,我们也会从其他方面来提高用户的开发效率,比如将类似的需求进行封装,提供一个统一的ETL模板,让用户可以很方便的使用Spark实现业务需求。

美团目前的数据生产主体是通过ETL将原始的日志通过清洗、转换等步骤后加载到Hive表中。而很多线上业务需要将Hive表里面的数据以一定的规则组成键值对,导入到Tair中,用于上层应用快速访问。其中大部分的需求逻辑相同,即把Hive表中几个指定字段的值按一定的规则拼接成key值,另外几个字段的值以json字符串的形式作为value值,最后将得到的对写入Tair。

 hive2Tair流程示例

由于Hive表中的数据量一般较大,使用单机程序读取数据和写入Tair效率比较低,因此部分业务方决定使用Spark来实现这套逻辑。最初由业务方的工程师各自用Spark程序实现从Hive读数据,写入到Tair中(以下简称hive2Tair流程),这种情况下存在如下问题:
每个业务方都要自己实现一套逻辑类似的流程,产生大量重复的开发工作;
由于Spark是分布式的计算引擎,因此代码实现和参数设置不当很容易对Tair集群造成巨大压力,影响Tair的正常服务。
基于以上原因,我们开发了Spark版的hive2Tair流程,并将其封装成一个标准的ETL模板,其格式和内容如下所示:

 hive2Tair  ETL模版

source用于指定Hive表源数据,target指定目标Tair的库和表,这两个参数可以用于调度系统解析该ETL的上下游依赖关系,从而很方便地加入到现有的ETL生产体系中。

有了这个模板,用户只需要填写一些基本的信息(包括Hive表来源,组成key的字段列表,组成value的字段列表,目标Tair集群)即可生成一个hive2Tair的ETL流程。整个流程生成过程不需要任何Spark基础,也不需要做任何的代码开发,极大地降低了用户的使用门槛,避免了重复开发,提高了开发效率。该流程执行时会自动生成一个Spark作业,以相对保守的参数运行:默认开启动态资源分配,每个Executor核数为2,内存2GB,最大Executor数设置为100。如果对于性能有很高的要求,并且申请的Tair集群比较大,那么可以使用一些调优参数来提升写入的性能。目前我们仅对用户暴露了设置Executor数量以及每个Executor内存的接口,并且设置了一个相对安全的最大值规定,避免由于参数设置不合理给Hadoop集群以及Tair集群造成异常压力。

基于Spark的用户特征平台

在没有特征平台之前,各个数据挖掘人员按照各自项目的需求提取用户特征数据,主要是通过美团的ETL调度平台按月/天来完成数据的提取。

但从用户特征来看,其实会有很多的重复工作,不同的项目需要的用户特征其实有很多是一样的,为了减少冗余的提取工作,也为了节省计算资源,建立特征平台的需求随之诞生,特征平台只需要聚合各个开发人员已经提取的特征数据,并提供给其他人使用。特征平台主要使用Spark的批处理功能来完成数据的提取和聚合。
开发人员提取特征主要还是通过ETL来完成,有些数据使用Spark来处理,比如用户搜索关键词的统计。
开发人员提供的特征数据,需要按照平台提供的配置文件格式添加到特征库,比如在图团购的配置文件中,团购业务中有一个用户24小时时段支付的次数特征,输入就是一个生成好的特征表,开发人员通过测试验证无误之后,即完成了数据上线;另外对于有些特征,只需要从现有的表中提取部分特征数据,开发人员也只需要简单的配置即可完成。

 特征平台 数据流向图

在图中,我们可以看到特征聚合分两层,第一层是各个业务数据内部聚合,比如团购的数据配置文件中会有很多的团购特征、购买、浏览等分散在不同的表中,每个业务都会有独立的Spark任务来完成聚合,构成一个用户团购特征表;特征聚合是一个典型的join任务,对比MapReduce性能提升了10倍左右。第二层是把各个业务表数据再进行一次聚合,生成最终的用户特征数据表。
特征库中的特征是可视化的,我们在聚合特征时就会统计特征覆盖的人数,特征的最大最小数值等,然后同步到RDB,这样管理人员和开发者都能通过可视化来直观地了解特征。 另外,我们还提供特征监测和告警,使用最近7天的特征统计数据,对比各个特征昨天和今天的覆盖人数,是增多了还是减少了,比如性别为女这个特征的覆盖人数,如果发现今天的覆盖人数比昨天低了1%(比如昨天6亿用户,女性2亿,那么人数降低了1%*2亿=2百万)突然减少2万女性用户说明数据出现了极大的异常,何况网站的用户数每天都是增长的。这些异常都会通过邮件发送到平台和特征提取的相关人。

Spark数据挖掘平台

数据挖掘平台是完全依赖于用户特征库的,通过特征库提供用户特征,数据挖掘平台对特征进行转换并统一格式输出,就此开发人员可以快速完成模型的开发和迭代,之前需要两周开发一个模型,现在短则需要几个小时,多则几天就能完成。特征的转换包括特征名称的编码,也包括特征值的平滑和归一化,平台也提供特征离散化和特征选择的功能,这些都是使用Spark离线完成。

开发人员拿到训练样本之后,可以使用Spark mllib或者Python sklearn等完成模型训练,得到最优化模型之后,将模型保存为平台定义好的模型存储格式,并提供相关配置参数,通过平台即可完成模型上线,模型可以按天或者按周进行调度。当然如果模型需要重新训练或者其它调整,那么开发者还可以把模型下线。不只如此,平台还提供了一个模型准确率告警的功能,每次模型在预测完成之后,会计算用户提供的样本中预测的准确率,并比较开发者提供的准确率告警阈值,如果低于阈值则发邮件通知开发者,是否需要对模型重新训练。

在开发挖掘平台的模型预测功时能我们走了点弯路,平台的模型预测功能开始是兼容Spark接口的,也就是使用Spark保存和加载模型文件并预测,使用过的人知道Spark mllib的很多API都是私有的开发人员无法直接使用,所以我们这些接口进行封装然后再提供给开发者使用,但也只解决了Spark开发人员的问题,平台还需要兼容其他平台的模型输出和加载以及预测的功能,这让我们面临必需维护一个模型多个接口的问题,开发和维护成本都较高,最后还是放弃了兼容Spark接口的实现方式,我们自己定义了模型的保存格式,以及模型加载和模型预测的功能。

 数据挖掘平台结构图

以上内容介绍了美团基于Spark所做的平台化工作,这些平台和工具是面向全公司所有业务线服务的,旨在避免各团队做无意义的重复性工作,以及提高公司整体的数据生产效率。目前看来效果是比较好的,这些平台和工具在公司内部得到了广泛的认可和应用,当然也有不少的建议,推动我们持续地优化。
随着Spark的发展和推广,从上游的ETL到下游的日常数据统计分析、推荐和搜索系统,越来越多的业务线开始尝试使用Spark进行各种复杂的数据处理和分析工作。下面将以Spark在交互式用户行为分析系统以及SEM投放服务为例,介绍Spark在美团实际业务生产环境下的应用。

Spark在交互式用户行为分析系统中的实践

美团的交互式用户行为分析系统,用于提供对海量的流量数据进行交互式分析的功能,系统的主要用户为公司内部的PM和运营人员。普通的BI类报表系统,只能够提供对聚合后的指标进行查询,比如PV、UV等相关指标。但是PM以及运营人员除了查看一些聚合指标以外,还需要根据自己的需求去分析某一类用户的流量数据,进而了解各种用户群体在App上的行为轨迹。根据这些数据,PM可以优化产品设计,运营人员可以为自己的运营工作提供数据支持,用户核心的几个诉求包括:

  1. 自助查询,不同的PM或运营人员可能随时需要执行各种各样的分析功能,因此系统需要支持用户自助使用。
  2. 响应速度,大部分分析功能都必须在几分钟内完成。
  3. 可视化,可以通过可视化的方式查看分析结果。

要解决上面的几个问题,技术人员需要解决以下两个核心问题:

  1. 海量数据的处理,用户的流量数据全部存储在Hive中,数据量非常庞大,每天的数据量都在数十亿的规模。
  2. 快速计算结果,系统需要能够随时接收用户提交的分析任务,并在几分钟之内计算出他们想要的结果。

要解决上面两个问题,目前可供选择的技术主要有两种:MapReduce和Spark。在初期架构中选择了使用MapReduce这种较为成熟的技术,但是通过测试发现,基于MapReduce开发的复杂分析任务需要数小时才能完成,这会造成极差的用户体验,用户无法接受。

因此我们尝试使用Spark这种内存式的快速大数据计算引擎作为系统架构中的核心部分,主要使用了Spark Core以及Spark SQL两个组件,来实现各种复杂的业务逻辑。实践中发现,虽然Spark的性能非常优秀,但是在目前的发展阶段中,还是或多或少会有一些性能以及OOM方面的问题。因此在项目的开发过程中,对大量Spark作业进行了各种各样的性能调优,包括算子调优、参数调优、shuffle调优以及数据倾斜调优等,最终实现了所有Spark作业的执行时间都在数分钟左右。并且在实践中解决了一些shuffle以及数据倾斜导致的OOM问题,保证了系统的稳定性。

结合上述分析,最终的系统架构与工作流程如下所示:

  1. 用户在系统界面中选择某个分析功能对应的菜单,并进入对应的任务创建界面,然后选择筛选条件和任务参数,并提交任务。
  2. 由于系统需要满足不同类别的用户行为分析功能(目前系统中已经提供了十个以上分析功能),因此需要为每一种分析功能都开发一个Spark作业。
  3. 采用J2EE技术开发了Web服务作为后台系统,在接收到用户提交的任务之后,根据任务类型选择其对应的Spark作业,启动一条子线程来执行Spark-submit命令以提交Spark作业。
  4. Spark作业运行在Yarn集群上,并针对Hive中的海量数据进行计算,最终将计算结果写入数据库中。
  5. 用户通过系统界面查看任务分析结果,J2EE系统负责将数据库中的计算结果返回给界面进行展现。

 交互式用户行为分析系统架构

该系统上线后效果良好:90%的Spark作业运行时间都在5分钟以内,剩下10%的Spark作业运行时间在30分钟左右,该速度足以快速响应用户的分析需求。通过反馈来看,用户体验非常良好。目前每个月该系统都要执行数百个用户行为分析任务,有效并且快速地支持了PM和运营人员的各种分析需求。

Spark在SEM投放服务中的应用

流量技术组负责着美团站外广告的投放技术,目前在SEM、SEO、DSP等多种业务中大量使用了Spark平台,包括离线挖掘、模型训练、流数据处理等。美团SEM(搜索引擎营销)投放着上亿的关键词,一个关键词从被挖掘策略发现开始,就踏上了精彩的SEM之旅。它经过预估模型的筛选,投放到各大搜索引擎,可能因为市场竞争频繁调价,也可能因为效果不佳被迫下线。而这样的旅行,在美团每分钟都在发生。如此大规模的随机“迁徙”能够顺利进行,Spark功不可没。

 SEM服务架构图

Spark不止用于美团SEM的关键词挖掘、预估模型训练、投放效果统计等大家能想到的场景,还罕见地用于关键词的投放服务,这也是本段介绍的重点。一个快速稳定的投放系统是精准营销的基础。

美团早期的SEM投放服务采用的是单机版架构,随着关键词数量的极速增长,旧有服务存在的问题逐渐暴露。受限于各大搜索引擎API的配额(请求频次)、账户结构等规则,投放服务只负责处理API请求是远远不够的,还需要处理大量业务逻辑。单机程序在小数据量的情况下还能通过多进程勉强应对,但对于如此大规模的投放需求,就很难做到“兼顾全局”了。

新版SEM投放服务在15年Q2上线,内部开发代号为Medusa。在Spark平台上搭建的Medusa,全面发挥了Spark大数据处理的优势,提供了高性能高可用的分布式SEM投放服务,具有以下几个特性:

  1. 低门槛,Medusa整体架构的设计思路是提供数据库一样的服务。在接口层,让RD可以像操作本地数据库一样,通过SQL来“增删改查”线上关键词表,并且只需要关心自己的策略标签,不需要关注关键词的物理存储位置。Medusa利用Spark SQL作为服务的接口,提高了服务的易用性,也规范了数据存储,可同时对其他服务提供数据支持。基于Spark开发分布式投放系统,还可以让RD从系统层细节中解放出来,全部代码只有400行。
  2. 高性能、可伸缩,为了达到投放的“时间”、“空间”最优化,Medusa利用Spark预计算出每一个关键词在远程账户中的最佳存储位置,每一次API请求的最佳时间内容。在配额和账号容量有限的情况下,轻松掌控着亿级的在线关键词投放。通过控制Executor数量实现了投放性能的可扩展,并在实战中做到了全渠道4小时全量回滚。
  3. 高可用,有的同学或许会有疑问:API请求适合放到Spark中做吗?因为函数式编程要求函数是没有副作用的纯函数(输入是确定的,输出就是确定的)。这确实是一个问题,Medusa的思路是把请求API封装成独立的模块,让模块尽量做到“纯函数”的无副作用特性,并参考面向轨道编程的思路,将全部请求log重新返回给Spark继续处理,最终落到Hive,以此保证投放的成功率。为了更精准的控制配额消耗,Medusa没有引入单次请求重试机制,并制定了服务降级方案,以极低的数据丢失率,完整地记录了每一个关键词的旅行。

结论和展望

本文我们介绍了美团引入Spark的起源,基于Spark所做的一些平台化工作,以及Spark在美团具体应用场景下的实践。总体而言,Spark由于其灵活的编程接口、高效的内存计算,能够适用于大部分数据处理场景。在推广和使用Spark的过程中,我们踩过不少坑,也遇到过很多问题,但填坑和解决问题的过程,让我们对Spark有了更深入的理解,我们也期待着Spark在更多的应用场景中发挥重要的作用。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——基础篇

美团点评阅读(264)

前言

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。

然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。

笔者根据之前的Spark作业开发经验以及实践积累,总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同学,主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。

本文作为Spark性能优化指南的基础篇,主要讲解开发调优以及资源调优。

开发调优

调优概述

Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

原则一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

一个简单的例子

// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。

// 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。
// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

一个简单的例子

// 错误的做法。

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正确的做法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

原则三:对多次使用的RDD进行持久化

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

对多次使用的RDD进行持久化的代码示例

// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

Spark的持久化级别

持久化级别含义解释
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

如何选择一种最合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

groupByKey实现wordcount原理

reduceByKey实现wordcount原理

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。

使用reduceByKey/aggregateByKey替代groupByKey

详情见“原则五:使用map-side预聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

使用foreachPartitions替代foreach

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

广播大变量的代码示例

// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原则八:使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

资源调优

调优概述

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

Spark作业基本运行原理

Spark基本运行原理

详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

资源参数调优

了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

写在最后的话

根据实践经验来看,大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后,一般都能以较高的性能运行了,足以满足我们的需求。但是在不同的生产环境和项目背景下,可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。为了应对这些挑战,需要使用更高级的技巧来处理这类问题。在后续的《Spark性能优化指南——高级篇》中,我们会详细讲解数据倾斜调优以及Shuffle调优。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——高级篇

美团点评阅读(278)

前言

基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。

数据倾斜调优

调优概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

数据倾斜原理

如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

  • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

解决方案二:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

解决方案三:提高shuffle操作的并行度

方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

解决方案六:采样倾斜key并分拆join操作

方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join即可。
  • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

shuffle调优

调优概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

写在最后的话

本文分别讲解了开发过程中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。希望大家能够在阅读本文之后,记住这些性能调优的原则以及方案,在Spark作业开发、测试以及运行的过程中多尝试,只有这样,我们才能开发出更优的Spark作业,不断提升其性能。

原文出自:https://tech.meituan.com/archives

大众点评支付渠道网关系统的实践之路

美团点评阅读(284)

业务的快速增长,要求系统在快速迭代的同时,保持很好的扩展性和可用性。其中,交易系统除了满足上述要求之外,还必须保持数据的强一致性。对系统开发人员而言,这既是机遇,也是挑战。本文主要梳理大众点评支付渠道网关系统在面对这些成长烦恼时的演进之路,以及过程中的一些思考和实践。

在整个系统的演进过程中,核心思路是:大系统做小,做简单(具体描述可参考《高可用性系统在大众点评的实践与经验》)。在渠道网关系统实践过程中,可以明显区分出几个有代表性的阶段。

一、能用阶段

早期业务流量还不是很大,渠道网关系统业务逻辑也很简单,一句话总结就是:让用户在交易的时候,能顺利把钱给付了。做的事情可简单概括成3件:发起支付请求、接收支付成功通知以及用户要求退款时原路退回给用户的支付账户。这个阶段系统实践比较简单,主要就是“短、平、快”,快速接入新的第三方支付渠道并保证能用。系统架构如图1。
能用阶段

二、可用阶段

在系统演进初期的快速迭代过程中,接入的第三方支付渠道不多,系统运行还算比较平稳,一些简单问题也可通过开发人员人工快速解决。但随着接入的第三方支付渠道不断增多,逐渐暴露出一些新的问题:

(1) 所有的业务逻辑都在同一个物理部署单元,不同业务之间互相影响(例如退款业务出现问题,但是与此同时把支付业务也拖垮了);

(2) 随着业务流量的增大,数据库的压力逐渐增大,数据库的偶尔波动造成系统不稳定,对用户的支付体验影响很大;

(3) 支付、退款等状态的同步很大程度上依赖第三方支付渠道的异步通知,一旦第三方支付渠道出现问题,造成大量客诉,用户体验很差,开发、运营都很被动。

针对(1)中的业务之间互相影响问题,我们首先考虑进行服务拆分,将之前一个大的物理部署单元拆成多个物理部署单元。有两种明显的可供选择的拆分策略:

  • 按照渠道拆分,不同的第三方支付渠道独立一个物理部署单元,例如微信一个部署单元,支付宝一个部署单元等;
  • 按照业务类型拆分,不同的业务独立一个物理部署单元,例如支付业务一个部署单元,退款业务一个部署单元等。

考虑到在当时的流量规模下,支付业务优先级最高,退款等业务的优先级要低;而有些渠道的流量占比很小,作为一个独立的部署单元,会造成一定的资源浪费,且增加了系统维护的复杂度。基于此,我们做了一个符合当时系统规模的trade-off:选择了第2种拆分策略 — 按照业务类型拆分。

针对(2)中的DB压力问题,我们和DBA一起分析原因,最终选择了Master-Slave方案。通过增加Slave来缓解查询压力;通过强制走Master来保证业务场景的强一致性;通过公司的DB中间件Zebra来做负载均衡和灾备切换,保证DB的高可用性。

针对(3)中的状态同步问题,我们对不同渠道进行梳理,在已有的第三方支付渠道异步通知的基础上,通过主动查询定时批量同步状态,解决了绝大部分状态同步问题。对于仍未同步的少量Case,系统开放出供内部使用的API,方便后台接入和开发人员手动补单。

在完成上述的实践之后,渠道网关系统已达到基本可用阶段,通过内部监控平台可以看到,核心服务接口可用性都能达到99.9%以上。演化之后的系统架构如图2。
可用阶段

三、柔性可用阶段

在解决了业务隔离、DB压力、状态同步等问题后,渠道网关系统度过一段稳定可用的时期。但架不住业务飞速增长的压力,之前业务流量规模下的一些小的系统波动、流量冲击等异常,在遭遇流量洪峰时被急剧放大,最终可能成为压垮系统的最后一根稻草。在新的业务流量规模下,我们面临着新的挑战:

(1) 随着团队的壮大,新加入的同学在接入新的渠道或者增加新的逻辑时,往往都会优先选用自己熟悉的方式完成任务。但熟悉的不一定是合理的,有可能会引入新的风险。特别是在与第三方渠道对接时,系统目前在使用的HTTP交互框架就有 JDK HttpURLConnection/HttpsURLConnection、Httpclient3.x、Httpclient4.x(4.x版本内部还分别有使用不同的小版本)。仅在这个上面就踩过好几次惨痛的坑。

(2) 在按业务类型进行服务拆分后,不同业务不再互相影响。但同一业务内部,之前流量规模小的时候,偶尔波动一次影响不大,现在流量增大后,不同渠道之间就开始互相影响。例如支付业务,对外统一提供分布式的支付API,所有渠道共享同一个服务RPC连接池,一旦某一个渠道的支付接口性能恶化,导致大量占用服务RPC连接,其他正常渠道的请求都无法进来;而故障渠道性能恶化直接导致用户无法通过该渠道支付成功,连锁反应导致用户多次重试,从而进一步导致恶化加剧,最终引起系统雪崩,拒绝服务,且重启后的服务还有可能被大量的故障渠道重试请求给再次击垮。

(3) 目前接入的第三方支付渠道,无论是第三方支付公司、银行或是其他外部支付机构,基本都是通过重定向或SDK的方式引导用户完成最终支付动作。在这条支付链路中,渠道网关系统只是在后端与第三方支付渠道进行交互(生成支付重定向URL或预支付凭证),且只能通过第三方支付渠道的异步通知或自己主动进行支付查询才能得知最终用户支付结果。一旦某个第三方支付渠道内部发生故障,渠道网关系统完全无法得知该支付链路已损坏,这对用户支付体验造成损害。

(4) 现有的渠道网关的DB,某些非渠道网关服务仍可直接访问,这对渠道网关系统的DB稳定性、DB容量规划等带来风险,进而影响渠道网关系统的可用性,内部戏称被戴了“绿帽子”。

(5) 对于退款链路,系统目前未针对退款异常case进行统一收集、整理并分类,且缺乏一个清晰的退款链路监控。这导致用户申请退款后,少量用户的退款请求最终未处理成功,用户发起客诉。同时由于缺乏监控,导致这种异常退款缺乏一个后续推进措施,极端情形下,引起用户二次客诉,极大损害用户体验和公司信誉度。

为最大程度解决问题(1)中描述的风险,在吸取踩坑的惨痛教训后,我们针对第三方渠道对接,收集并整理不同的应用场景,抽象出一套接入框架。接入框架定义了请求组装、请求执行、响应解析和错误重试这一整套网关交互流程,屏蔽了底层的HTTP或Socket交互细节,并提供相应的扩展点。针对银行渠道接入存在前置机这种特殊的应用场景,还基于Netty抽象出连接池(Conn Pool)和简单的负载均衡机制(LB, 提供Round Robin路由策略)。不同渠道在接入时可插入自定义的组装策略(扩展已有的HttpReq、HttpsReq或NettyReq),执行策略[扩展已有(Http、Https或Netty)Sender/Receiver],解析策略(扩展已有的HttpResp、HttpsResp或NettyResp),并复用框架已提供的内容解析(binary/xml/json parser)、证书加载(keystore/truststore loader)和加解密签名(encrypt/decrypt/sign/verify sign)组件,从而在达到提高渠道接入效率的同时,尽可能减少新渠道接入带来的风险。接入框架的流程结构如图3。
渠道接入框架

为解决问题(2)中渠道之间相互影响,一个简单直观的思路就是渠道隔离。如何隔离,隔离到什么程度?这是2个主要的问题点:

  • 如何隔离 考虑过将支付服务进一步按照渠道拆分,将系统继续做小,但是拆分后,支付API的调用端需要区分不同渠道调用不同的支付API接口,这相当于将渠道隔离问题抛给了调用端;同时拆分后服务增多,调用端需要维护同一渠道支付业务的多个不同RPC-API,复杂度提高,增加了开发人员的维护负担,这在当前的业务流量规模下不太可取。所以我们选择了在同一个支付服务API内部进行渠道隔离。由于共用同一个支付服务服务API连接池,渠道隔离的首要目标就是避免故障渠道大量占用AP连接池,对其他正常渠道造成株连影响。如果能够自动检测出故障渠道,并在其发生故障的初期阶段就快速失败该故障渠道的请求,则从业务逻辑上就自动完成了故障渠道的隔离。
  • 隔离到什么程度 一个支付渠道下存在不同的支付方式(信用卡支付、借记卡支付、余额支付等),而有些支付方式(例如信用卡支付)还存在多个银行。所以我们直接将渠道隔离的最小粒度定义到支付渠道 -> 支付方式 -> 银行。

基于上述的思考,我们设计并实现了一个针对故障渠道的快速失败(fail-fast)机制:

  • 将每一笔支付请求所附带的支付信息抽象为一个特定的fail-fast路径,请求抽象成一个fail-fast事务,请求成功即认为事务成功,反之,事务失败。
  • 在fail-fast事务执行过程中,级联有2个fail-fast断路开关:
    • 静态开关,根据人工配置(on/off),断定某个支付请求是否需快速失败。
    • 动态开关,根据历史统计信息,确定当前健康状态,进而断定是否快速失败当前支付请求。
  • 动态断路开关抽象了3种健康状态(closed-放行所有请求;half_open-部分比例的请求放行;open-快速失败所有请求),并依据历史统计信息(总请求量/请求失败量/请求异常量/请求超时量),在其内部维护了一个健康状态变迁的状态机。状态变迁如图4。
    FailFast状态变迁
  • 状态机的每一次状态变迁都会产生一个健康状态事件,收银台服务可以监听这个健康状态事件,实现支付渠道的联动上下线切换。
  • 每一笔支付请求结束后都会动态更新历史统计信息。

经过线上流量模拟压测观察,fail-fast机制给系统支付请求增加了1~5ms的额外耗时,相比第三方渠道的支付接口耗时,占比1%~2%,属于可控范围。渠道故障fail-fast机制上线之后,结合压测配置,经过几次微调,稳定了线上环境的fail-fast配置参数。

在前不久的某渠道支付故障时,通过公司内部的监控平台,明显观察到fail-fast机制起到很好的故障隔离效果,如下图5。
故障隔离效果

为解决问题(3)中支付链路可用性监测,依赖公司内部的监控平台上报,实时监控支付成功通知趋势曲线;同时渠道网关系统内部从业务层面自行实现了支付链路端到端的监控。秒级监控支付链路端到端支付成功总量及支付成功率,并基于这2个指标的历史统计信息,提供实时的支付链路邮件或短信报警。而在流量高峰时,该监控还可通过人工手动降级(异步化或关闭)。这在很大程度上提高了开发人员的核心支付链路故障响应速度。

为解决问题(4)中的“绿帽子”,渠道网关系统配合DBA回收所有外部系统的DB直接访问权限,提供替换的API以供外部系统访问,这给后续的提升DB稳定性、DB容量规划以及后续可能的异步多机房部署打下基础。

针对问题(5)中退款case,渠道网关系统配合退款链路上的其他交易、支付系统,从源头上对第三方渠道退款异常case进行统一收集、整理并分类,并形成退款链路核心指标(退款当日成功率/次日成功率/7日成功率)监控,该部分的系统实践会随着后续的“退款链路统一优化”一起进行分享;

随着上述实践的逐步完成,渠道网关系统的可用性得到显著提高,核心链路的API接口可用性达到99.99%,在公司的917大促中,渠道网关系统平稳度过流量高峰,并迎来了新的记录:提交第三方渠道支付请求的TPS达到历史新高。且在部分渠道接口发生故障时,能保证核心支付API接口的稳定性,并做到故障渠道的自动检测、恢复,实现收银台对应渠道的联动上下线切换。同时,通过核心支付链路支付成功率监控,实现第三方渠道内部故障时,渠道上下线的手动切换。至此,基本保证了在部分第三方渠道有损的情况下,渠道网关系统的柔性可用。演化后的此阶段系统架构如图6。
柔性可用阶段

四、经验与总结

在整个渠道网关系统一步步的完善过程中,踩过很多坑,吃过很多教训,几点小的收获:

  1. 坚持核心思想,拆分、解耦,大系统做小,做简单;
  2. 系统总会有出问题的时候,重要的是如何快速定位、恢复、解决问题,这是一个长期而又艰巨的任务;
  3. 高可用性的最大敌人不仅是技术,还是使用技术实现系统的人,如何在业务、系统快速迭代的过程中,保证自我驱动,不掉队;
  4. 高流量,大并发对每一个工程师既是挑战,更是机遇。

原文出自:https://tech.meituan.com/archives

专访美团外卖曹振团:天下武功唯快不破

美团点评阅读(283)

本文转自InfoQ中文网站,首发地址:http://www.infoq.com/cn/news/2016/06/Meituan-take-away

马云曾经说过:世界是懒人创造出来的。在“懒人”们的推动下,O2O的战火已经燃烧到了外卖行业。据报告,2015年外卖市场年交易额已经达到450余亿元,其中美团外卖从2013年底上线以来,已经迅速超过了400万日订单。作为美团内部的创业项目,美团外卖是如何一步步从复用美团的基础服务,到如今具有在高并发下百万级别订单的业务系统的。

2016年7月15-16日,ArchSummit全球架构师峰会将在深圳举行。本届大会,我们邀请了美团外卖技术专家曹振团老师,前来分享《美团外卖系统架构演进与系统稳定性经验谈》的内容,讲述在美团外卖业务的高速发展中,系统架构各阶段面临的不同挑战以及对应的解决方案。

现在我们就来采访曹振团老师,让老师给大家送一份充满干货的“外卖”。

InfoQ:在你认为,怎样的架构是一个好架构,怎样的架构师是一个好的架构师?能否谈谈如何避免一个“坏”架构的诞生?

曹振团:一个好的架构是与业务特点及与业务发展阶段相结合的、因地制宜且与时俱进的。一个好的架构既能支撑业务的快速稳定发展,又能留有一定的灵活性和扩展性。而好的架构师需要做好三点:

  1. 首先需要充分理解业务,只有充分理解业务之后才能使架构不仅能够很好地支持业务特点,并具有一定的前瞻性。架构师需要站在推进业务发展的角度上合理地改进和优化架构设计,为业务的快速发展做好保障。
  2. 其次,架构师要具备良好的沟通和协调能力。架构师往往要面临着跨组、跨团队、跨事业部甚至跨事业群的一些技术方案,需要沟通和协调各方的诉求和冲突。
  3. 最后要具备持续学习的能力。不仅要学习技术知识的变化,更要学习业界的发展思路、最佳实践等。

好的架构是实践出来的,好的架构师也是在实践中快速成长起来的。新美大内部对架构师的培养非常注重实践和交流。架构师都在业务一线深入了解业务、需求及技术特性。美团外卖业务在快速发展的过程中踩了很多坑,这也让我们的架构师更快的成长起来。新美大内部各事业群的Casestudy都是共享的,这也是一个非常好的学习平台,从他人的经验和总结中学习。另外内部开设有架构专题分享,提供给大家一个架构实践的交流和讨论的平台。

架构通常是为了解决系统的高并发、高性能、高可用的问题,结合业务特点在研发资源、排期、技术方案之间做平衡。一个“坏”的架构则破坏了这种平衡,比如:由于工期紧张而引入了一个自己并不能把控的技术方案,为系统的稳定性埋下了雷。还有一个简单的判断标准是:当采用这个架构后在未来多长时间或几倍增量下需要调整架构。基本上要求至少在未来的半年到一年内或2倍增量下不需要调整架构。如果架构设计评审不符合这个标准就要及时重新设计或调整。

InfoQ:你曾在网易工作过,能否简单谈谈在架构设计上美团外卖与网易各自的特点是什么?

曹振团:门户网站面对着海量的新闻事件及分类,具有更新量大、过期快和热点集中的特点。新闻资料基本是静态的,所以对缓存技术应用较多。

美团外卖涉及到交易的信息、支付、履约的各个环节,参与方较多。从用户支付下单到商家接单,还有骑手配送等环节,这些过程都是动态有交互的,美团外卖对服务化及服务治理设计较多。

InfoQ:你加入美团后有参与多个创新业务的探索,那么你认为在业务初期好的架构是必要的吗,为什么呢?能否分享经历了多个创新业务之后的收获,这些经验给美团外卖带来了什么样的收获和挑战?

曹振团:2013年加入美团时,我们在O2O这个方向上进行了多个创新业务的探索,完全是新的事情,从零开始。在早期最重要的事情就是验证需求,验证产品是否能够满足用户的核心需求,是否能够被用户接受。这一阶段就是快速试错,通常以MVP的方式快速迭代。我们需要足够快地找到方向,此时灵活性优先于架构。

加入新美大的早期参与公司内部的创业,在O2O这个领域进行了多个创新业务的探索,最后决定做外卖这个业务。在经历创新业务后最大的收获主要讲两点:

  1. 创业的早期决定不做什么往往比决定做什么更重要。在早期产品技术资源都很紧缺,需要将有限的资源投入到最核心的产品需求上。
  2. 天下武功唯快不破。如今的市场瞬息万变,唯有快速的迭代,方能抢占先机。

创业会给技术带来大量的挑战:首先需要是个全面手,前端、后台、数据库、运维都能够迅速切入。另外也要有足够的技术视野和深度,才能够在技术方案的选择上游刃有余,既能快速的选择合适的方案,又能够把握其技术原理。

InfoQ:美团外卖创建初期是否借鉴了美团的架构设计?美团外卖架构的创建和发展经历了什么样的过程?

曹振团:美团外卖创建初期复用了很多美团的基础服务,主要借鉴了团购业务积累的经验。 美团外卖的架构是随着业务的发展优化和演进的。主要经历了以下几个过程:

  1. 自由发展阶段:业务起步的时候,大家公用服务和数据库表,这样能够快速支持产品迭代。产品和技术人员聚焦在快速验证产品功能上。到底应该有哪些功能,用户会如何点外卖等?这个阶段主要的特点就是集中,所有的功能都集中在几个项目里,所有的表都集中在一个库中。
  2. 故障驱动架构:随着业务的爆发增长,早期的架构出现了很多的问题,系统频繁地出现稳定性的问题,共享数据库表导致业务逻辑散落各地、甚至实现不一致的情况。这时系统稳定性问题倒逼架构进行优化调整,进行了服务化拆分,服务之间全部用接口的方式调用。
  3. 架构驱动改革:随着单量的快速增长,系统故障所造成的损失是巨大的、不可接受的。需要从架构驱动技术体系的改进、甚至推进产品和业务的变革。同时增加业务的容灾能力,进行了多机房的部署。

InfoQ:在外卖交易流程中,哪个环节最容易出现问题,你们是如何解决的?

曹振团:美团外卖在快速发展的过程中踩了很多坑。比如发版引发的故障,这类问题一般是最多的。发版的新功能可能含有bug,可能含有慢SQL,可能没有正确处理网络异常等。外卖业务还是一个实时且流程较长的业务,尤其流量比较集中在中午和晚上的饭点,在高峰时段的QPS是万级别的,外卖整个链条还会涉及到很多的服务,如果某个依赖服务有问题将会影响到其它服务。例如没有设置合理的RPC超时时间,如果服务A挂了,会导致依赖该服务的其它服务出现问题,进而引起连锁反应,最终系统雪崩。

针对这些问题,我们做了关键路径梳理。整个交易流程中梳理出关键路径,非关键调用异步化,关键路径做好容错容灾设计,实时监控关键路径的核心指标。整体系统做了比较全面的应急预案,当有紧急情况发生时,力保关键路径不出问题。

至于如何避免掉坑,分享几点:

  1. 防御式编程,不要相信任何人或服务,做好对自身服务的保护和对依赖服务的熔断。
  2. 每次发版都有预案。回滚是解决发版引发故障的最快捷有效的手段。
  3. 灰度!灰度!灰度!
  4. 完善的监控很重要,需要覆盖系统性能及业务指标,并对这些指标熟知和敏感。

InfoQ: 在美团外卖业务急速发展的同时,目前美团外卖架构上是否有需要改进和突破的地方?是否有关注前沿的技术,目前通过什么渠道来发现持续优化架构的方法?

曹振团: 业务屡创新高及团队规模的扩大,给技术架构带来了越来越多的挑战。如何能够在大规模作战的情况下,既能保持业务开发的敏捷性又能保障系统的稳定性,是我们架构中要重点思考的,我们一直在结合着业务发展的节奏,优化和改进技术架构。一方面要与时俱进地支持多品类的业务扩展及平台化建设;另一方面,在永恒不变的话题“稳定性”方面,在为能够支持更高的业务量,持续抽取和优化基础服务,加强中间件的能力,使各服务能够更聚焦和专注。因此加强基础设施和强固中间件的建设是我们要改进和突破的地方。夯实技术基础设施和中间件,让研发力量更聚焦在业务开发中。

美团外卖在技术架构中大量使用了开源的技术组件,在深入调研后会结合外卖的业务特点做二次开发,感谢Java社区里丰富的开源软件,使得我们能够更好地优化我们的架构。另外一个发现和持续优化架构的方法是全链路的在线压测。我们会定期进行全链路的在线压测,发现系统的瓶颈并优化验证。

InfoQ: 在架构演进过程中分别使用了哪些核心指标来判断公司架构是否改善?你们对核心指标的要求是多少,目前是否已经稳定达到?

曹振团:架构的演进是为了能够提供更可靠的SLA,核心的考量指标有:5倍的容量,4个9的可用性,TP99 200ms的响应时间,目前已经可以稳定达到了。这些指标界定了系统能够在一定的容量下具有稳定和可靠的表现。我们会定期地做全链路的在线压测,通过实时的数据指标监控系统分析和验证是否已经达成架构目标。可用性和响应时间在服务端还是比较好衡量和达成的,但是从用户端看就变得复杂了很多。比如用户的设备、网络环境等都可能会对结果造成影响,我们也在加强用户侧的监控,持续地优化和改进用户体验。

InfoQ: 外卖领域作为一种懒人经济,你们认为这些“懒人”在数据上体现了怎样的特征?

曹振团:有一种说法是“世界是由懒人驱动的”。由于懒得去换台,发明了遥控器。懒得爬楼梯,发明了电梯。如果你懒得出去吃,我们可以送外卖上门。懒是一个强需求。

我们在解决这个强需求的过程中,发现了一些有趣的变化:由于出现了这样的外卖平台,满足了大家懒的需求,越发使大家变“懒”了,平均点外卖的次数在变多,花样在变多(大家的口味在变多)。虽然变“懒”了,但是对品质的要求却在提高。我们通过优化配送链条上的服务,使得能够更及时、快速地为“懒人”提供美食。配送变快了,口感有了更好的保证。这又促使一些之前不提供外卖服务的品牌商家加入到外卖平台上来,使得整个生态更加良性发展。

另外一个特征是,“懒人”的需求也是多样的,“懒人”们希望能够提供更多的外送选择,这将会改写“外卖”的定义,它将会含有更广阔和丰富的含义,而不只是 “美食外卖”。

InfoQ: 作为架构师,有什么感悟和经验和大家分享吗?

曹振团: 主要分享几点吧:

  1. 性能是功能的一部分,稳定是功能的一部分。性能和稳定性需要在开发设计的时候作为产品功能的一部分来考虑,而不是扩展属性。这样才能在开发、测试、上线、运行中全程把握。
  2. 简单即美。把复杂的事情简单化,抓住核心脉络,解决好主要矛盾不见得一个大而全的海纳百川的架构才是好的,相反清晰、明了的架构是我们追求的,简单可依赖。
  3. 使用能够驾驭的技术。丰富的开源社区给了我们很多的选择和视角,任何引入的开源技术都应该理解其原理和本质,只有能够驾驭的技术才能用好。
  4. 细节决定成败。软件架构是一项复杂和精细的工作,只有把每个细节都做极致了,才能保障架构的基石稳健。

InfoQ:感谢曹振团老师接受我们的采访。期待你在ArchSummit全球架构师峰会上的分享。

原文出自:https://tech.meituan.com/archives

Cache应用中的服务过载案例研究

美团点评阅读(279)

简单地说,过载是外部请求对系统的访问量突然激增,造成请求堆积,服务不可用,最终导致系统崩溃。本文主要分析引入Cache可能造成的服务过载,并讨论相关的预防、恢复策略。Cache在现代系统中使用广泛,由此引入的服务过载隐患无处不在,但却非常隐蔽,容易被忽视。本文希望能为开发者在设计和编写相关类型应用,以及服务过载发生处理时能够有章可循。

一个服务过载案例

本文讨论的案例是指存在正常调用关系的两个系统(假设调用方为A系统,服务方为B系统),A系统对B系统的访问突然超出B系统的承受能力,造成B系统崩溃。造成服务过载的原因很多,这里分析的是严重依赖Cache的系统服务过载。首先来看一种包含Cache的体系结构(如下图所示)。

A系统依赖B系统的读服务,A系统是60台机器组成的集群,B系统是6台机器组成的集群,之所以6台机器能够扛住60台机器的访问,是因为A系统并不是每次都访问B,而是首先请求Cache,只有Cache的相应数据失效时才会请求B。

这正是Cache存在的意义,它让B系统节省了大量机器;如果没有Cache,B系统不得不组成60台机器的集群,如果A也同时依赖除B系统外的另一个系统(假设为C系统)呢?那么C系统也要60台机器,放大的流量将很快耗尽公司的资源。

然而Cache的引入也不是十全十美的,这个结构中如果Cache发生问题,全部的流量将流向依赖方,造成流量激增,从而引发依赖系统的过载。

回到A和B的架构,造成服务过载的原因至少有下面三种:

  1. B系统的前置代理发生故障或者其他原因造成B系统暂时不可用,等B系统系统服务恢复时,其流量将远远超过正常值。
  2. Cache系统故障,A系统的流量将全部流到B系统,造成B系统过载。
  3. Cache故障恢复,但这时Cache为空,Cache瞬间命中率为0,相当于Cache被击穿,造成B系统过载。

第一个原因不太好理解,为什么B系统恢复后流量会猛增呢?主要原因就是缓存的超时时间。当有数据超时的时候,A系统会访问B系统,但是这时候B系统偏偏故障不可用,那么这个数据只好超时,等发现B系统恢复时,发现缓存里的B系统数据已经都超时了,都成了旧数据,这时当然所有的请求就打到了B。

下文主要介绍服务过载的预防和发生后的一些补救方法,以预防为主,从调用方和服务方的视角阐述一些可行方案。

服务过载的预防

所谓Client端指的就是上文结构中的A系统,相对于B系统,A系统就是B系统的Client,B系统相当于Server。

Client端的方案

针对上文阐述的造成服务过载的三个原因:B系统故障恢复、Cache故障、Cache故障恢复,我们看看A系统有哪些方案可以应对。

合理使用Cache应对B系统宕机

一般情况下,Cache的每个Key除了对应Value,还对应一个过期时间T,在T内,get操作直接在Cache中拿到Key对应Value并返回。但是在T到达时,get操作主要有五种模式:

1. 基于超时的简单(stupid)模式

在T到达后,任何线程get操作发现Cache中的Key和对应Value将被清除或标记为不可用,get操作将发起调用远程服务获取Key对应的Value,并更新写回Cache,然后get操作返回新值;如果远程获取Key-Value失败,则get抛出异常。

为了便于理解,举一个码头工人取货的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,这时5个工人各自分别去对岸取新货,然后返回。

2. 基于超时的常规模式

在T到达后,Cache中的Key和对应Value将被清除或标记为不可用,get操作将调用远程服务获取Key对应的Value,并更新写回Cache;此时,如果另一个线程发现Key和Value已经不可用,get操作还需要判断有没有其他线程发起了远程调用,如果有,那么自己就等待,直到那个线程远程获取操作成功,Cache中得Key变得可用,get操作返回新的Value。如果远程获取操作失败,则get操作抛出异常,不会返回任何Value。

还是码头工人的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,那么只需派出一个人去对岸取货,其他四个人在港口等待即可,而不用5个人全去。

基于超时的简单模式和常规模式区别在于对于同一个超时的Key,前者每个get线程一旦发现Key不存在,则发起远程调用获取值;而后者每个get线程发现Key不存在,则还要判断当前是否有其他线程已经发起了远程调用操作获取新值,如果有,自己就简单的等待即可。

显然基于超时的常规模式比基于超时的简单模式更加优化,减少了超时时并发访问后端的调用量。

实现基于超时的常规模式就需要用到经典的Double-checked locking惯用法了。

3. 基于刷新的简单(stupid)模式

在T到达后,Cache中的Key和相应Value不动,但是如果有线程调用get操作,将触发refresh操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:任何线程发现Key过期,都触发一次refresh操作,get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value。注意refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。
  • 异步模式:任何线程发现Key过期,都触发一次refresh操作,get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。

举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 5个人各自去远程取新货,如果取货失败,则拿着旧货返回(同步模式)
  • 5个人各自通知5个雇佣工去取新货,5个工人拿着旧货先回(异步模式)

4. 基于刷新的常规模式

在T到达后,Cache中的Key和相应Value都不会被清除,而是被标记为旧数据,如果有线程调用get操作,将触发refresh更新操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value,注意:refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。如果其他线程进行get操作,Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。
  • 异步模式:get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。如果其他线程进行get操作,发现Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。

再举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 派一个人去远方港口取新货,其余4个人拿着旧货先回(同步模式)。
  • 5个人通知一个雇佣工去远方取新货,5个人都拿着旧货先回(异步模式)。

基于刷新的简单模式和基于刷新的常规模式区别就在于取数线程之间能否感知当前数据是否正处在刷新状态,因为基于刷新的简单模式中取数线程无法感知当前过期数据是否正处在刷新状态,所以每个取数线程都会触发一个刷新操作,造成一定的线程资源浪费。

而基于超时的常规模式和基于刷新的常规模式区别在于前者过期数据将不能对外访问,所以一旦数据过期,各线程要么拿到数据,要么抛出异常;后者过期数据可以对外访问,所以一旦数据过期,各线程要么拿到新数据,要么拿到旧数据。

5. 基于刷新的续费模式

该模式和基于刷新的常规模式唯一的区别在于refresh操作超时或失败的处理上。在基于刷新的常规模式中,refresh操作超时或失败时抛出异常,Cache中的相应Key-Value还是旧值,这样下一个get操作到来时又会触发一次refresh操作。

在基于刷新的续费模式中,如果refresh操作失败,那么refresh将把旧值当成新值返回,这样就相当于旧值又被续费了T时间,后续T时间内get操作将取到这个续费的旧值而不会触发refresh操作。

基于刷新的续费模式也像常规模式那样分为同步模式和异步模式,不再赘述。

下面讨论这5种Cache get模式在服务过载发生时的表现,首先假设如下:

  • 假设A系统的访问量为每分钟M次。
  • 假设Cache能存Key为C个,并且Key空间有N个。
  • 假设正常状态下,B系统访问量为每分钟W次,显然W\<\<M。

这时因为某种原因,比如B长时间故障,造成Cache中得Key全部过期,B系统这时从故障中恢复,五种get模式分析表现分析如下:

  1. 在基于超时和刷新的简单模式中,B系统的瞬间流量将达到和A的瞬时流量M大体等同,相当于Cache被击穿。这就发生了服务过载,这时刚刚恢复的B系统将肯定会被大流量压垮。
  2. 在基于超时和刷新的常规模式中,B系统的瞬间流量将和Cache中Key空间N大体等同。这时是否发生服务过载,就要看Key空间N是否超过B系统的流量上限了。
  3. 在基于刷新的续费模式中,B系统的瞬间流量为W,和正常情况相同而不会发生服务过载。实际上,在基于刷新的续费模式中,不存在Cache Key全部过期的情况,就算把B系统永久性地干掉,A系统的Cache也会基于旧值长久的平稳运行。

第3点,B系统不会发生服务过载的主要原因是基于刷新的续费模式下不会出现chache中的Key全部长时间过期的情况,即使B系统长时间不可用,基于刷新的续费模式也会在一个过期周期内把旧值当成新值继续使用。所以当B系统恢复时,A系统的Cache都处在正常工作状态。

从B系统的角度看,能够抵抗服务过载的基于刷新的续费模式最优。

从A系统的角度看,由于一般情况下A系统是一个高访问量的在线web应用,这种应用最讨厌的一个词就是“线程等待”,因此基于刷新的各种异步模式较优。

综合考虑,基于刷新的异步续费模式是首选

然而凡是有利就有弊,有两点需要注意的地方:

  1. 基于刷新模式最大的缺点是Key-Value一旦放入Cache就不会被清除,每次更新也是新值覆盖旧值,JVM GC永远无法对其进行垃圾收集,而基于超时的模式中,Key-Value超时后如果新的访问没有到来,内存是可以被GC垃圾回收的。所以如果你使用的是寸土寸金的本地内存做Cache就要小心了。
  2. 基于刷新的续费模式需要做好监控,不然有可能Cache中的值已经和真实的值相差很远了,应用还以为是新值而使用。

关于具体的Cache,来自Google的Guava本地缓存库支持上文的第二种、第四种和第五种get操作模式。

但是对于Redis等分布式缓存,只提供原始的get、set方法,而提供的get仅仅是获取,与上文提到的五种get操作模式不是一个概念。开发者想用这五种get操作模式的话不得不自己封装和实现。

五种get操作模式中,基于超时和刷新的简单模式是实现起来最简单的模式,但遗憾的是这两种模式对服务过载完全无免疫力,这可能也是服务过载在大量依赖缓存的系统中频繁发生的一个重要原因吧。

本文之所以把第1、3种模式称为stupid模式,是想强调这种模式应该尽量避免,Guava里面根本没有这种模式,而Redis只提供简单的读写操作,很容易就把系统实现成了这种方式。

应对分布式Cache宕机

如果是Cache直接挂了,那么就算是基于刷新的异步续费模式也无能为力了。这时A系统铁定无法对Cache进行存取操作,只能将流量完全打到B系统,B系统面对服务过载在劫难逃……

本节讨论的预防Cache宕机仅限于分布式Cache,因为本地Cache一般和A系统应用共享内存和进程,本地Cache挂了A系统也挂了,不会出现本地Cache挂了而A系统应用正常的情况。

首先,A系统请求线程检查分布式Cache状态,如果无应答则说明分布式Cache挂了,则转向请求B系统,这样一来大流量将压垮B系统。这时可选的方案如下:

  1. A系统的当前线程不请求B系统,而是打个日志并设置一个默认值。
  2. A系统的当前线程按照一定概率决定是否请求B系统。
  3. A系统的当前线程检查B系统运行情况,如果良好则请求B系统。

方案1最简单,A系统知道如果没有Cache,B系统可能扛不住自己的全部流量,索性不请求B系统,等待Cache恢复。但这时B系统利用率为0,显然不是最优方案,而且当请求的Value不容易设置默认值时,这个方案就不行了。

方案2可以让一部分线程请求B系统,这部分请求肯定能被B系统hold住。可以保守的设置这个概率 u =(B系统的平均流量)/(A系统的峰值流量)

方案3是一种更为智能的方案,如果B系统运行良好,当前线程请求;如果B系统过载,则不请求,这样A系统将让B系统处于一种宕机与不宕机的临界状态,最大限度挖掘B系统性能。这种方案要求B系统提供一个性能评估接口返回Yes和No,Yes表示B系统良好,可以请求;No表示B系统情况不妙,不要请求。这个接口将被频繁调用,必须高效。

方案3的关键在于如何评估一个系统的运行状况。一个系统中当前主机的性能参数有CPU负载、内存使用率、Swap使用率、GC频率和GC时间、各个接口平均响应时间等,性能评估接口需要根据这些参数返回Yes或者No,是不是机器学习里的二分类问题?😄关于这个问题已经可以单独写篇文章讨论了,在这里就不展开了,你可以想一个比较简单傻瓜的保守策略,缺点是A系统的请求无法很好的逼近B系统的性能极限。

综合以上分析,方案2比较靠谱。如果选择方案3,建议由专门团队负责研究并提供统一的系统性能实时评估方案和工具。

应对分布式Cache宕机后的恢复

不要以为成功hold住分布式Cache宕机就万事大吉了,真正的考验是分布式Cache从宕机过程恢复之后,这时分布式Cache中什么都没有。

即使是上文中提到了基于刷新的异步续费策略这时也没用,因为分布式Cache为空,无论如何都要请求B系统。这时B系统的最大流量是Key的空间取值数量。

如果Key的取值空间数量很少,则相安无事;如果Key的取值空间数量大于B系统的流量上限,服务过载依然在所难免。

这种情况A系统很难处理,关键原因是A系统请求Cache返回Key对应Value为空,A系统无法知道是因为当前Cache是刚刚初始化,所有内容都为空;还是因为仅仅是自己请求的那个Key没在Cache里

如果是前者,那么当前线程就要像处理Cache宕机那样进行某种策略的回避;如果是后者,直接请求B系统即可,因为这是正常的Cache使用流程。

对于Cache宕机的恢复,A系统真的无能为力,只能寄希望于B系统的方案了。

Server端的方案

相对于Client端需要应对各种复杂问题,Server端需要应对的问题非常简单,就是如何从容应对过载的问题。无论是缓存击穿也好,还是拒绝服务攻击也罢,对于Server端来说都是过载保护的问题。对于过载保护,主要给出两种可行方案,以及一种比较复杂的方案思路。

流量控制

流量控制就是B系统实时监控当前流量,如果超过预设的值或者系统承受能力,则直接拒绝掉一部分请求,以实现对系统的保护。

流量控制根据基于的数据不同,可分为两种:

  1. 基于流量阈值的流控:流量阈值是每个主机的流量上限,流量超过该阈值主机将进入不稳定状态。阈值提前进行设定,如果主机当前流量超过阈值,则拒绝掉一部分流量,使得实际被处理流量始终低于阈值。
  2. 基于主机状态的流控:每个接受每个请求之前先判断当前主机状态,如果主机状况不佳,则拒绝当前请求。

基于阈值的流控实现简单,但是最大的问题是需要提前设置阈值,而且随着业务逻辑越来越复杂,接口越来越多,主机的服务能力实际应该是下降的,这样就需要不断下调阈值,增加了维护成本,而且万一忘记调整的话,呵呵……

主机的阈值可以通过压力测试确定,选择的时候可以保守些。

基于主机状态的流控免去了人为控制,但是其最大的确定上文已经提到:如何根据当前主机各个参数判断主机状态呢?想要完美的回答这个问题目测并不容易,因此在没有太好答案之前,我推荐基于阈值的流控。

流量控制基于实现位置的不同,又可以分为两种:

  1. 反向代理实现流控:在反向代理如Nginx上基于各种策略进行流量控制。这种一般针对HTTP服务。
  2. 借助服务治理系统:如果Server端是RMI、RPC等服务,可以构建专门的服务治理系统进行负载均衡、流控等服务。
  3. 服务容器实现流控:在应用代码里,业务逻辑之前实现流量控制。

第3种在服务器的容器(如Java容器)中实现流控并不推荐,因为流控和业务代码混在一起容易混乱;其次实际上流量已经全量进入到了业务代码里,这时的流控只是阻止其进入真正的业务逻辑,所以流控效果将打折;还有,如果流量策略经常变动,系统将不得不为此经常更改。

因此,推荐前两种方式。

最后提一个注意点:当因为流控而拒绝请求时,务必在返回的数据中带上相关信息(比如“当前请求因为超出流量而被禁止访问”),如果返回值什么都没有将是一个大坑。因为造成调用方请求没有被响应的原因很多,可能是调用方Bug,也可能是服务方Bug,还可能是网络不稳定,这样一来很可能在排查一整天后发现是流控搞的鬼……

服务降级

服务降级一般由人为触发,属于服务过载造成崩溃恢复时的策略,但为了和流控对比,将其放到这里。

流量控制本质上是减小访问量,而服务处理能力不变;而服务降级本质上是降低了部分服务的处理能力,增强另一部分服务处理能力,而访问量不变。

服务降级是指在服务过载时关闭不重要的接口(直接拒绝处理请求),而保留重要的接口。比如服务由10个接口,服务降级时关闭了其中五个,保留五个,这时这个主机的服务处理能力将增强到二倍左右。

然而,服务过载发生时动辄就超出系统处理能力10倍,而服务降级能使主机服务处理能力提高10倍么?显然很困难,因此服务过载的应对不能只依靠服务降级策略。

动态扩展

动态扩展指的是在流量超过系统服务能力时,自动触发集群扩容,自动部署并上线运行;当流量过去后又自动回收多余机器,完全弹性。

这个方案是不是感觉很不错。但是目前互联网公司的在线应用跑在云上的本身就不多,要完全实现在线应用的自动化弹性运维,要走的路就更多了。

崩溃恢复

如果服务过载造成系统崩溃还是不幸发生了,这时需要运维控制流量,等后台系统启动完毕后循序渐进的放开流量,主要目的是让Cache慢慢预热。流量控制刚开始可以为10%,然后20%,然后50%,然后80%,最后全量,当然具体的比例,尤其是初始比例,还要看后端承受能力和前端流量的比例,各个系统并不相同。

如果后端系统有专门的工具进行Cache预热,则省去了运维的工作,等Cache热起来再发布后台系统即可。但是如果Cache中的Key空间很大,开发预热工具将比较困难。

结论

“防患于未然”放在服务过载的应对上也是适合的,预防为主,补救为辅。综合上文分析,具体的预防要点如下:

  1. 调用方(A系统)采用基于刷新的异步续费模式使用Cache,或者至少不能使用基于超时或刷新的简单(stupid)模式。
  2. 调用方(A系统)每次请求Cache时检查Cache是否可用(available),如果不可用则按照一个保守的概率访问后端,而不是无所顾忌的直接访问后端。
  3. 服务方(B系统)在反向代理处设置流量控制进行过载保护,阈值需要通过压测获得。

崩溃的补救主要还是靠运维和研发在发生时的通力合作:观察流量变化准确定位崩溃原因,运维控流量研发持续关注性能变化。

未来如果有条件的话可以研究下主机应用健康判断问题和动态弹性运维问题,毕竟自动化比人为操作要靠谱。

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交