若谷学院
互联网公司技术架构分享

实时数据产品实践——美团大交通战场沙盘

美团点评阅读(2726)

背景

大数据时代,数据的重要性不言而喻,尤其对于互联网公司,随着业务的快速变化,商业模式的不断创新、用户体验个性化、实时化需求日益突出,海量数据实时处理在商业方面的需求越来越大。如何通过数据快速分析出用户的行为,以便做出准确的决策,越来越体现一个公司的价值。现阶段对于实时数据的建设比较单一,主要存在以下问题:

  1. 实时仓库建设不足,维度及指标不够丰富,无法快速满足不同业务需求。
  2. 实时数据和离线数据对比不灵活,无法自动化新增对比基期数据,且对比数据无法预先生产。
  3. 数据监控不及时,一旦数据出现问题而无法及时监控到,就会影响业务分析决策。
    因此,本文将基于美团点评大交通实时数据产品,从面临的挑战、总体解决方案、数据设计架构、后台设计架构等几个方面,详细介绍实时数据系统的整体建设思路。

挑战

实时流数据来源系统较多,处理非常复杂,并且不同业务场景对实时数据的要求不同,因此在建设过程主要有以下挑战:

  1. 如何在保证数据准确性的前提下实现多实时流关联;实时流出现延迟、乱序、重复时如何解决。
    流式计算中通常需要将多个实时流按某些主键进行关联得到特定的实时数据,但不同于离线数据表关联,实时流的到达是一个增量的过程,无法获取实时流的全量数据,并且实时流的达到次序无法确定,因此在进行关联时需要考虑存储一些中间状态及下发策略问题。
  2. 实时流可复用性,实时流的处理不能只为解决一个问题,而是一类甚至几类问题,需要从业务角度对数据进行抽象,分层建设,以快速满足不同场景下对数据的要求。
  3. 中台服务如何保证查询性能、数据预警及数据安全。
    实时数据指标维度较为丰富,多维度聚合查询场景对服务层的性能要求较高,需要服务层能够支持较快的计算能力和响应能力;同时数据出现问题后,需要做好及时监控并快速修复。
  4. 如何保证产品应用需求个性化。
    实时数据与离线数据对比不灵活,需要提供可配置方案,并能够及时生产离线数据。

解决思路

我们在充分梳理业务需求的基础上,重新对实时流进行了建设,将实时数据分层建模,并对外提供统一的接口,保证数据同源同口径;同时,在数据服务层,增加可配置信息模块解决了配置信息不能自动化的问题,在数据处理策略上做了多线程处理、预计算、数据降级等优化,在数据安全方面增加数据审计功能,更好地提升了产品的用户体验。

总体方案

产品整体建设方案基于美团点评技术平台,总共分为源数据层、存储层、服务层及WEB层,整体架构如下所示:


图1 整体架构图

源数据层:主要提供三部分数据,实时数据、离线数据、配置信息、维度信息。
存储层:源数据清洗后放入相应的存储引擎中,为服务层提供数据服务。
服务层:提供三部分功能,数据API服务、预计算服务、权限服务、数据审计服务。
Web层:使用Echarts可视化数据。

数据层

数据架构

依托于美团点评提供的公共资源平台,数据架构按功能分为数据采集、数据处理、数据存储、数据服务四层,如下所示:


图2 数据架构图

数据采集

数据来源主要有两种:业务上报的Log日志及数据库Binlog日志。这些日志通过美团点评日志中心进行采集后存储在消息中间件Kafka中,并按照不同的分类存储在不同的Topic中,供下游订阅。

数据处理

数据处理顾名思义,就是对采集的实时流进行逻辑处理,按业务需求输出对应的实时数据,因此这一步骤是流式计算的关键,分两步进行:数据加工、数据推送。

数据加工:数据加工通常需要在流式计算系统中进行,目前流行的流式处理系统主要有Storm、Spark Streaming系统及Flink系统,这些系统都能在不同的应用场景下发挥很好处理能力,并各有优缺点,如下图所示:

计算框架吞吐量延迟传输保障处理模式成熟度
Storm毫秒级At least once单条处理成熟
Spark Streaming秒级Exactly once微批处理成熟
Flink毫秒级Exactly once单条处理/微批处理新兴

最终我们选择Storm作为实时数据处理框架,并借助公司提供的通用组件来简化拓扑开发流程和重复代码编码。例如,组件MTSimpleLogBolt的主要功能是将Kafka中读取的数据(Log or Binlog)解析成Java实体对象;组件StormConfHelper的功能是获取Storm作业应用配置信息。

数据推送:将处理好的数据推送到存储引擎中。

数据存储

数据加工完成后会被存储到实时存储引擎中,以提供给下游使用。目前常用的存储引擎主要有MySQL、Druid、Elasticsearch、Redis、Tair比较如下:

存储引擎优点缺点
MySQL使用简单,支持数据量小数据量大,对MySQL的压力大,查询性能慢
Druid数据预计算不支持精确查询
Elasticsearch查询效率快,支持常用聚合操作;可以做到精确去重查询条件受限
Redis内存存储KV,查询效率高写入资源有限,不支持大数据量写入
Tair持久化和非持久化两种缓存,查询效率高单节点性能比Redis较弱
Kylin多维查询预计算不支持实时

综上比较,由于实时数据量较大,且数据精度要求较高,因此我们最终选择交易存储使用ES,流量存储使用Druid,维度存储使用Tair,中间数据存储使用Redis;而离线数据,我们采用Hive和Kylin存储。

数据服务

将存储引擎数据统一对外提供查询服务,支持不同业务应用场景。

具体实现

实时流处理流程

整个数据层架构上主要分为实时数据和离线数据两部分:实时数据分为交易的Binlog日志和流量的Log日志,经过Strom框架处理后写入Kafka,再经过DataLinkStreaming分别写入ES和Druid;离线数据通过Hive处理写入Kylin。


图3 产品数据架构

下图所示为一条消息的处理流程:


图4 数据关系

两个Topic分别是order_base(主要存放订单基本信息:订单id、订单状态、支付时间、票量、金额等);order_biz(主要存放订单的扩展信息:订单id、订单类型、出发时间、到达时间、出发城市、到达城市)。我们最终要拿到一条包括上述全部内容的一条记录。


图5 数据处理流程

具体例子:Bolt在处理一条记录时,首先判断这条记录是base还是biz,如果是base则写入缓存中base的Category中,如果是biz则写入biz的Category中。以order_id为Key,如果是base则去和biz关联,如果biz存在则代表能够关联上,这时发送关联后的完整数据,同时删除该主键(order_key)记录;如果biz中不存在,则说明没关联上,这时可能biz的数据延迟或者是丢失,为了保证主数据的准确性,这时我们只发送base的数据,缓存中的数据保留不被删除。如果这条消息是biz,则首先会更新缓存中该主键的biz记录,然后去和base关联,关联上则发送同时删除base中数据,否则不发送。此时我们会根据ES的Update特性去更新之前的数据。从现实效果来看保证了99.2%的数据完整性,符合预期。

数据写入

在Topic2es的数据推送中,通过DataLinkString工具(底层Spark Streaming)实现了Kafka2es的微批次同步,一方面通过多并发batch写入ES获得了良好的吞吐,另一方面提供了5秒的实时写入效率,保证了ES查询的实时可见。同时我们也维护了Kafka的Offset,可以提供At lease once的同步服务,并结合ES的主键,可以做到Exactly once,有效解决了数据重复问题。

ES索引设计及优化

在数据写入ES过程中,由于数据量大,索引时间区间长,在建设索引时需要考虑合理设计保证查询效率,因此主要有以下三点优化:

  • 写入优化 在通过DataLinkString写入ES时,在集群可接受的范围内,数据Shuffle后再分组,增加Client并发数,提升写入效率。
  • 数据结构化 根据需要设计了索引的模版,使用了最小的足够用的数据类型。
  • 按天建索引 通过模版按天建索引,避免影响磁盘IO效率,同时通过别名兼容搜索一致性。
  • 设置合理的分片和副本数 如果分片数过少或过多都会导致检索比较慢。分片数过多会导致检索时打开比较多的文件,另外也会影响多台服务器之间通讯。而分片数过少为导至单个分片索引过大,所以检索速度慢。在确定分片数之前需要进行单服务单索引单分片的测试。 我们根据 索引分片数=数据总量/单分片数 设置了合理的分片数。

实时数据仓库模型

整个实时数据开发遵循大交通实时数仓的分层设计,在此也做一下简单介绍,实时数仓架构如下:

图6 实时数仓架构

ODS层:包含美团页面流量日志、模块事件日志以及用户操作的Binlog信息日志,是直接从业务系统采集过来的原始数据。
事实明细层:根据主题和业务过程,生成订单事实和流量事实。
汇总层:对明细层的数据扩展业务常用的维度信息,形成主题宽表。
App层:针对不同应用在汇总层基础上加工扩展的聚合数据,如火车票在抢票业务下的交易数据汇总信息。

规范建模后,业务需求来临时,只需要在App层建模即可,底层数据统一维护。

中台服务层

后台服务主要实现 登陆验证和权限验证(UPM)、指标逻辑计算和API、预计算服务、数据质量监控、数据审计功能。由于数据量大且实时性要求较高,在实现过程遇到如下挑战:

  • 如何保证查询响应性能。
  • 服务发生故障后,数据降级方案。
  • 数据监控预警方案及数据出现问题解决方案。

针对以上问题,下面进行一一详述:

性能响应优化

服务层处理数据过程中,由于数据量大,在查询时需要一定的响应时间,所以在保证响应性能方面,主要做了以下优化:


图7 性能响应优化

  1. 项目初始由于数据量不是很大,采用单线程直接查询ES,能够满足需求。
  2. 随着节假日来临,数据量大增,并行查询人次增多,查询响应变慢,无法快速响应结果,因此引入缓存技术,将中间结果进行缓存,并在缓存有效期内,直接读取缓存数据大大提高了时间效率;并且在此基础上,引入Master-Worker多线程模式,将多指标查询拆分,并行查询ES,使得查询响应大大提高。
  3. 虽然问题得到解决,但仍存在一个问题,就是每次都是现查ES及部分中间缓存结果,尤其是第一次查询,需要完全走ES,这样就会让第一个查询数据的用户体验较差,因此引入预计算服务,通过定时调度任务,将部分重要维度下的指标进行预计算放入缓存,用户查询时直接读取缓存数据。而一些不常用的维度下的数据,采用的策略是,第一个用户查询时现查ES,并将结果数据预加载到缓存,后续所有用户再次查询直接读缓存数据,这样既能保证用户体验,也不至于占用太多缓存空间。

数据降级方案

使用缓存避免不了出现一些问题,比如缓存失效、缓存雪崩等问题,针对缓存雪崩问题,通过设置不同Key的过期时间能够很好的解决;而对于缓存数据失效,我们有自己的数据降级方案,具体方案如下:


图8 数据降级方案

预计算数据会分别在Redis、Tair和本地缓存中存储一份以保证查询效率,当查询Redis数据不存在时,会去Tair中读取数据,Tair也为空时,会读取本地缓存,只有当本地缓存数据也为空时,才会现查ES做聚合计算,这样也会降低ES的查询压力。

数据监控

实时监控预警非常重要,在数据出现问题时,一方面能够及时通知我们快速定位修复数据,另一方面也能够及时周知业务同学,避免做出错误分析。基于此,我们做了两方面的实时监控,其一是对源实时流在Storm处理层面的监控,确保源实时流正确生产;其二是对展示的汇总数据进行监控,确保产品展示指标数据正常。
针对数据出现问题预警,我们在解决方案上规范了流程:

  1. 监控报警机制及时周知相关同学。
  2. 第一时间通过产品上方的黄条提示用户哪些数据异常。
  3. 快速定位问题,给出修复方案。
    目前对于实时异常数据的修补,主要有两种方法:
    a. 针对特殊情况的数据修补方案第一灵活指定Offset,重新消费Kafka数据。
    b. 预留了Hive2es的准实时重导功能,确保生产数据的准确和完整。

数据安全

在以数据取胜的时代,数据的安全不言而喻,我们采用公司提供的UPM权限接口进行二级权限管理并加入审计功能及水印功能,能够准确记录用户的所有访问以及操作记录,并且将日志数据格式化到数据库中,进行实时监控分析。

总结

实时数据可以为业务特定场景分析决策提供巨大支持,尤其对于大交通节假日及春运期间。在大交通实时战场沙盘产品化过程中,我们投入了大量的思考和实践,主要取得以下收益:

  1. 可视化的产品,为业务方实时分析提供极大便利,取得较好的反馈。
  2. 优化实时数据仓库建设,合理分层建模,规范命名设计,统一维度建设和指标口径,对外提供统一接口,保证数据规范准确。
  3. 在Storm框架下实时开发和数据写入方面积累了一定的经验。
  4. 服务层支持可配置信息,可以灵活配置个性化信息。
  5. 服务层性能及获取数据策略的优化,为用户带来更好的产品体验。

加入我们

最后插播一个招聘广告,我们是一群擅长大数据领域数据建设、数仓建设、数据治理及数据BI应用建设的工程师,期待更多能手加入,感兴趣的可以投递个人简历到邮箱:yangguang09#meituan.com,欢迎您的加入。

作者介绍

娣娣,美团点评数据开发工程师,2015年加入美团,从事数据仓库建设、大数据产品开发工作。
晓磊,美团点评数据开发工程师,2017年加入美团,从事数据仓库建设、大数据产品开发工作。

原文出自:https://tech.meituan.com/archives

分布式块存储系统Ursa的设计与实现

美团点评阅读(2805)

引言

云硬盘对IaaS云计算平台有至关重要的作用,几乎已成为必备组件,如亚马逊的EBS(Elastic Block Store)、阿里云的盘古、OpenStack中的Cinder等。云硬盘可为云计算平台带来许多优良特性,如更高的数据可靠性和可用性、灵活的数据快照功能、更好的虚拟机动态迁移支持、更短的主机故障恢复时间等等。随着万兆以太网逐渐普及,云硬盘的各项优势得到加强和凸显,其必要性变得十分强烈。

云硬盘的底层通常是分布式块存储系统,目前开源领域有一些此类项目,如Ceph RBD、Sheepdog。另外MooseFS和GlusterFS虽然叫做文件系统,但由于其特性与块存储系统接近,也能用于支持云硬盘。我们在测评中发现,这些开源项目均存在一些问题,使得它们都难以直接应用在大规模的生产系统当中。例如Ceph RBD的效率较低(CPU使用过高);Sheepdog在压力测试中出现了数据丢失的现象;MooseFS的POSIX语义支持、基于FUSE的架构、不完全开源的2.0版本等问题给它自身带来了许多的局限性;GlusterFS与Ceph同属红帽收购的开源存储系统,主要用于scale-out文件存储场景,在云计算领域使用不多。此外,这些存储系统都难以充发挥用万兆网卡和SSD的性能潜力,难以在未来承担重任。

由于以上原因,美团云研发了全新的分布式块存储系统Ursa,通过简单稳固的系统架构、高效的代码实现以及对各种非典型场景的仔细考虑,实现了高可靠、高可用、高性能、低开销、可扩展、易运维、易维护等等目标。Ursa的名字源于DotA中的熊战士,他具有极高的攻击速度、攻击力和生命值,分别隐喻存储系统中的IOPS、吞吐率和稳定性。

分布式块存储相关项目与技术

2.1 Ceph

(主要参考:https://www.ustack.com/blog/ceph_infra/

Ceph项目起源于其创始人Sage Weil在加州大学Santa Cruz分校攻读博士期间的研究课题。项目的起始时间为2004年。在2006年的OSDI学术会议上,Sage发表了关于Ceph的论文,并提供了项目的下载链接,由此开始广为人知。2010年Ceph客户端部分代码正式进入Linux kernel 2.6.34。

Ceph同时提供对象、块和文件这三个层次的分布式存储服务,其中只有块层存储与我们相关。由于块存储在IaaS云计算系统中占有重要地位,Ceph在近些年的关注度得到显著提高。许多云计算系统实例基于Ceph提供块存储服务,如UnitedStack、Mirantis OpenStack等。

Ceph性能测试

  • 测试版本:0.81
  • 操作系统:CentOS 6.x
  • 测试工具:fio
  • 服务器配置:
    • CPU: Intel Xeon E5-2650v2 @ 2.6GHz
    • RAM: 96GB
    • NIC: 10 GbE
    • HDD: 6 NL SAS, 7200 RPM
    • RAID Controller: Dell H710p (LSI 2208 with 1GB NVRAM)
  • 服务器数量:4,其中一个为兼职客户端

注意:由于客户端位于一个存储服务器上,所以有1/4的吞吐率不经过网卡。

测试结果如下:

  • 读IOPS:16 407(此时客户端CPU占用率超过500%,5台服务器CPU总使用率接近500%)
  • 写IOPS:941
  • 顺序读吞吐率:218 859 KB/s
  • 顺序写吞吐率:67 242 KB/s
  • 顺序读延迟:1.6ms (664 IOPS)
  • 顺序写延迟:4.4ms (225 IOPS)
  • 网络ping值:0.1324ms
  • 本地硬盘顺序读写延迟:0.03332ms (29 126 IOPS)

从测试来看,Ceph的读吞吐率正常,然而写吞吐率不足读的1/3,性能偏低;读写延迟比显著大于网络延迟与磁盘I/O延迟之和;CPU占用率过高。

2.2 Sheepdog

(主要参考:http://peterylh.blog.163.com/blog/static/12033201221594937257/

Sheepdog是由日本NTT实验室的Morita Kazutaka专为虚拟化平台创立的分布式块存储开源项目, 于2009年开源[1]。从2011年9月开始, 一些淘宝的工程师加入了Sheepdog项目,以及相关开源项目比如Corosync、Accord的开发。Sheepdog主要由两部分组成:集群管理和存储服务,其中集群管理目前使用Corosync或者Zookper来完成,存储服务是全新实现的。

Sheepdog采用无中心节点的全对称架构,基于一致性哈希实现从ObjectID到存储节点的定位:每个节点划分成多个虚拟节点,虚拟节点和ObjectID一样,采用64位整数唯一标识,每个虚拟节点负责一段包含节点ID在内的ObjectID区间。DataObject副本存在ObjectID对应的虚拟节点,及在后续的几个节点上。Sheepdog无单点故障问题,存储容量和性能均可线性扩展,新增节点通过简单配置即可加入集群,并且Sheepdog自动实现负载均衡,节点故障时可自动发现并进行副本修复,还直接支持QEMU/KVM。

Sheepdog的服务进程既承担数据服务的职责,同时也是客户端(QEMU)访问数据的gateway。QEMU的Sheepdog driver将对volume的请求转换成为对object的请求,然后通过UNIX domain socket或者TCP socket连接一个Sheepdog服务进程,并将访问请求发给该进程来完成后续步骤。Sheepdog的服务进程还可以开启数据缓存功能,以减少网络I/O。Sheepdog的I/O路径是“client<->gateway<->object manager(s)”,读操作可以在任意副本完成,更新操作并行的发往所有副本, 当所有副本都更新成功之后,gateway才告诉客户端更新操作成功。

Sheepdog的数据可靠性问题

我们对Sheepdog开展了可靠性、可用性测试。在测试中有共3台服务器,每台配有6个机械硬盘,配置好Sheepdog之后,每台服务器启动10个VM,每个VM内无限循环运行fio分别执行小块随机读、写和大块顺序读、写的测试。

在执行压力测试一周后,对集群中的全部数据进行一致性检测(collie cluster check),发现有些数据块副本与另外2个不一致(“fixed replica …”),有些数据块的3个各不相同(“no majority of …”):

[root@node3-10gtest ~]# collie cluster check
fix vdi test1-3
99.9 % [=================================================================>] 50 GB / 50 GB 
fixed replica 3e563000000fca
99.9 % [=================================================================>] 50 GB / 50 GB      
fixed replica 3e563000000fec
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e5630000026f5
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000002da6
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001e8c
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001530
...
fix vdi test2-9
50.9 % [=================================>                                ] 25 GB / 50 GB      
no majority of d781e300000123
51.0 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e300000159
51.2 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e30000018a
53.2 % [====================================>                             ] 27 GB / 50 GB      
…

2.3 MooseFS

(主要参考:http://peterylh.blog.163.com/blog/static/12033201251791139592/

MooseFS是容错的分布式文件系统,通过FUSE支持标准POSIX文件系统接口。 MooseFS的架构类似于GFS,由四个部分组成:

  • 管理服务器Master:类似于GFS的Master,主要有两个功能:(1)存储文件和目录元数据,文件元数据包括文件大小、属性、对应的Chunk等;(2)管理集群成员关系和Chunk元数据信息,包括Chunk存储、版本、Lease等。
  • 元数据备份服务器Metalogger Server:根据元数据文件和log实时备份Master元数据。
  • 存储服务器Chunk Server:负责存储Chunk,提供Chunk读写能力。Chunk文件默认为64MB大小。
  • 客户端Client:以FUSE方式挂到本地文件系统,实现标准文件系统接口。

MooseFS本地不会缓存Chunk信息, 每次读写操作都会访问Master, Master的压力较大。此外MooseFS写操作流程较长且开销较高。MooseFS支持快照,但是以整个Chunk为单位进行CoW(Copy-on-Write),可能造成响应时间恶化,补救办法是以牺牲系统规模为代价,降低Chunk大小。

MooseFS基于FUSE提供POSIX语义支持,已有应用可以不经修改直接迁移到MooseFS之上,这给应用带来极大的便利。然而FUSE也带来了一些负面作用,比如POSIX语义对于块存储来说并不需要,FUSE会带来额外开销等等。

2.4 GFS/HDFS

(主要参考:http://www.nosqlnotes.net/archives/119

HDFS基本可以认为是GFS的一个简化开源实现,二者因此有很多相似之处。首先,GFS和HDFS都采用单一主控机+多台工作机的模式,由一台主控机(Master)存储系统全部元数据,并实现数据的分布、复制、备份决策,主控机还实现了元数据的checkpoint和操作日志记录及回放功能。工作机存储数据,并根据主控机的指令进行数据存储、数据迁移和数据计算等。其次,GFS和HDFS都通过数据分块和复制(多副本,一般是3)来提供更高的可靠性和更高的性能。当其中一个副本不可用时,系统都提供副本自动复制功能。同时,针对数据读多于写的特点,读服务被分配到多个副本所在机器,提供了系统的整体性能。最后,GFS和HDFS都提供了一个树结构的文件系统,实现了类似与Linux下的文件复制、改名、移动、创建、删除操作以及简单的权限管理等。

然而,GFS和HDFS在关键点的设计上差异很大,HDFS为了规避GFS的复杂度进行了很多简化。例如HDFS不支持并发追加和集群快照,早期HDFS的NameNode(即Master)没原生HA功能。总之,HDFS基本可以认为是GFS的简化版,由于时间及应用场景等各方面的原因对GFS的功能做了一定的简化,大大降低了复杂度。

2.5 HLFS

(主要参考:http://peterylh.blog.163.com/blog/static/120332012226104116710/

HLFS(HDFS Log-structured File System)是一个开源分布式块存储系统,其最大特色是结合了LFS和HDFS。HDFS提供了可靠、随时可扩展的文件服务,而HLFS通过Log-structured技术弥补了HDFS不能随机更新的缺憾。在HLFS中,虚拟磁盘对应一个文件, 文件长度能够超过TB级别,客户端支持Linux和Xen,其中Linux基于NBD实现,Xen基于blktap2实现,客户端通过类POSIX接口libHLFS与服务端通讯。HLFS主要特性包括多副本、动态扩容、故障透明处理和快照。

HLFS性能较低。首先,非原地更新必然导致数据块在物理上非连续存放,因此读I/O比较随机,顺序读性能下降。其次,虽然从单个文件角度看来,写I/O是顺序的,但是在HDFS的Chunk Server服务了多个HLFS文件,因此从它的角度来看,I/O仍然是随机的。第三,写延迟问题,HDFS面向大文件设计,小文件写延时不够优化。第四,垃圾回收的影响,垃圾回收需要读取和写入大量数据,对正常写操作造成较大影响。此外,按照目前实现,相同段上的垃圾回收和读写请求不能并发,垃圾回收算法对正常操作的干扰较大。

2.6 iSCSI、FCoE、AoE、NBD

iSCSI、FCoE、AoE、NBD等都是用来支持通过网络访问块设备的协议,它们都采用C/S架构,无法直接支持分布式块存储系统。

Ursa的设计与实现

分布式块存储系统给虚拟机提供的是虚拟硬盘服务,因而有如下设计目标:

  • 大文件存储:虚拟硬盘实际通常GB级别以上,小于1GB是罕见情况
  • 需要支持随机读写访问,不需支持追加写,需要支持resize
  • 通常情况下,文件由一个进程独占读写访问;数据块可被共享只读访问
  • 高可靠,高可用:任意两个服务器同时出现故障不影响数据的可靠性和可用性
  • 能发挥出新型硬件的性能优势,如万兆网络、SSD
  • 由于应用需求未知,同时需要优化吞吐率和IOPS
  • 高效率:降低资源消耗,就降低了成本

除了上述源于虚拟硬盘的直接需求意外,分布式块存储还需要支持下列功能:

  • 快照:可给一个文件在不同时刻建立多个独立的快照
  • 克隆:可将一个文件或快照在逻辑上复制成独立的多份
  • 精简配置(thin-provisioning):只有存储数据的部分才真正占用空间

3.1 系统架构

分布式存储系统总体架构可以分为有master(元数据服务器)和无master两大类。有master架构在技术上较为简单清晰,但存在单点失效以及潜在的性能瓶颈问题;无master架构可以消除单点失效和性能瓶颈问题,然而在技术上却较为复杂,并且在数据布局方面具有较多局限性。块存储系统对master的压力不大,同时master的单点故障问题可采用一些现有成熟技术解决,因而美团EBS的总体架构使用有master的类型。这一架构与GFS、HDFS、MooseFS等系统的架构属于同一类型。

如图1所示,美团EBS系统包括M、S和C三个部分,分别代表Master、Chunk Server和Client。Master中记录的元数据包括3种:(1)关于volume的信息,如类型、大小、创建时间、包含哪些数据chunk等等;(2)关于chunk的信息,如大小、创建时间、所在位置等;(3)关于Chunk Server的信息,如IP地址、端口、数据存储量、I/O负载、空间剩余量等。这3种信息当中,关于volume的信息是持久化保存的,其余两种均为暂存信息,通过Chunk Server上报。

I/O Stack

在元数据中,关于volume的信息非常重要,必须持久化保存;关于chunk的信息和Chunk Server的信息是时变的,并且是由Chunk Server上报的,因而没必要持久化保存。根据这样的分析,我们将关于volume的信息保存在MySQL当中,其他元数据保存在Redis当中,余下的集群管理功能由Manager完成。Master == Manager + MySQL + Redis,其中MySQL使用双机主从配置,Redis使用官方提供的标准cluster功能。

3.2 CAP取舍

C、A、P分别代表Consistency、Availability和Partition-tolerance。分布式系统很难同时在这三个方面做到很高的保障,通常要在仔细分析应用需求的基础之上对CAP做出取舍。

块存储系统主要用于提供云硬盘服务,每块云硬盘通常只会挂载到1台VM之上,不存在多机器并发读写的情况,因而其典型应用场景对一致性的需求较低。针对这一特性,我们可以在设计上舍C而取AP。

对于多机并行访问云硬盘的使用模式,若数据是只读的则无需额外处理;若数据有写有读,甚至是多写多读,则需要在上层引入分布式锁,来确保数据一致性和完整性。这种使用模式在SAN领域并不少见,其典型应用场景是多机同时挂载一个网络硬盘,并通过集群文件系统(而不是常见的单机文件系统)来协调访问存储空间。集群文件系统内部会使用分布式锁来确保数据操作的正确性,所以我们舍C的设计决策不会影响多机并行访问的使用模式。

3.3 并发模型

并发(不是并行!)模型的选择和设计无法作为实现细节隐藏在局部,它会影响到程序代码的各个部分,从底层到上层。基本的并发模型只有这样几种:事件驱动、多线程、多进程以及较为小众的多协程。

事件驱动模型是一种更接近硬件工作模式的并发模型,具有更高的执行效率,是高性能网络服务的普遍选择。为能充分发挥万兆网络和SSD的性能潜力,我们必须利用多核心并行服务,因而需要选用多线程或者多进程模型。由于多进程模型更加简单,进程天然是故障传播的屏障,这两方面都十分有利于提高软件的健壮性;并且我们也很容易对业务进行横向拆分,做到互相没有依赖,也不需要交互,所以我们选择了多进程模型,与事件驱动一起构成混合模型。

协程在现实中的应用并不多,很多语言/开发生态甚至不支持协程,然而协程在一些特定领域其实具有更好的适用性。比如,QEMU/KVM在磁盘I/O方面的并发执行完全是由协程负责的,即便某些block driver只提供了事件驱动的接口(如Ceph RBD),QEMU/KVM也会自动把它们转化封装成多协程模式。实践表明,在并发I/O领域,多协程模型可以同时在性能和易用性方面取得非常好的效果,所以我们做了跟QEMU/KVM类似的选择——在底层将事件驱动模型转换成了多协程模型,最终形成了“多进程+多协程+事件驱动”的混合并发模型。

3.4 存储结构

如图所示,Ursa中的存储结构与GFS/HDFS相似,存储卷由64MB(可配置)大小的chunk组成。Ursa系统中的数据复制、故障检测与修复均在chunk层次进行。系统中,每3个(可配置)chunk组成一组,互为备份。每2个chunk组构成一个stripe组,实现条带化交错读写,提高单客户端顺序读写性能。Ursa在I/O栈上层添加Cache模块,可将最常用的数据缓存在客户端本地的SSD介质当中,当访问命中缓存时可大大提高性能。

I/O Stack

3.5 写入策略

最常见的写入策略有两种:(1)客户端直接写多个副本到各个Chunk Server,如图(a)所示,Sheepdog采用此种办法;(2)客户端写第一个副本,并将写请求依次传递下去,如图(b)所示。这两种方法各有利弊:前者通常具有较小的写入延迟,但吞吐率最高只能达到网络带宽的1/3;后者的吞吐率可以达到100%网络带宽,却具有较高的写入延迟。

I/O Stack

由于Ursa可能用于支持各种类型的应用,必须同时面向吞吐率和带宽进行优化,所以我们设计采用了一种分叉式的写入策略:如图(c)所示,客户端使用WRITE_REPLICATE请求求将数据写到第一个副本,称为primary,然后由primary负责将数据分别写到其他副本。这样Ursa可以在吞吐率和延迟两方面取得较好的均衡。为确保数据可靠性,写操作会等所有副本的写操作都完成之后才能返回。

3.6 无状态服务

Chunk Server内部几乎不保存状态,通常情况下各个请求之间是完全独立执行的,并且重启Chunk Server不会影响后续请求的执行。这样的Chunk Server不但具有更高的鲁棒性,而且具有更高的扩展性。许多其他网络服务、协议的设计都遵循了无状态的原则。

3.7 模块

如下图所示,Ursa中的I/O功能模块的组织采用decorator模式,即所有模块都实现了IStore抽象接口,其中负责直接与Chunk Server通信的模块属于decorator模式中的concrete component,其余模块均为concrete decorator。所有的decorator都保存数量不等的指向其他模块的指针(IStore指针)。

I/O Stack

在运行时,Ursa的I/O栈层次结构的对象图如下所示。

I/O Stack

3.8 产品界面

I/O Stack

性能实测

如下图所示,测试环境由万兆以太网、1台Client和3台Chunk Server组成,chunk文件存在tmpfs上,即所有写操作不落盘,写到内存为止。选择tmpfs主要是为了避免硬盘的I/O速度的局限性,以便测试Ursa在极限情况下的表现。

I/O Stack

测试环境的网络延迟如下:

I/O Stack

在上述环境中,用fio分别测试了读写操作的吞吐率、IOPS以及I/O延迟,测试参数与结果如下:

I/O Stack

从测试结果可以看出:
(1). Ursa在吞吐率测试中可以轻易接近网卡理论带宽;
(2). Ursa的IOPS性能可以达到SSD的水准;
(3). Ursa的读延迟接近ping值,写操作需要写3个副本,延迟比读高68%。

作为对比,我们在测试Ceph的过程中监测到服务端CPU占用情况如下:

I/O Stack

此时机器上5个存储服务进程ceph-osd共占用123.7%的CPU资源,提供了4 101的读IOPS服务;而Ursa的服务进程只消耗了43%的CPU资源,提供了61 340读IOPS服务,运行效率比Ceph高43倍。在客户端方面,Ceph消耗了500%+的CPU资源,得到了16 407读IOPS;而Ursa只消耗了96%的CPU资源,得到了61 340读IOPS,运行效率比Ceph高21倍

总结与展望

Ursa从零开始动手开发到内部上线只经历了9个月的时间,虽然基本功能、性能都已达到预期,但仍有许多需要进一步开发的地方。一个重要的方向是对SSD的支持。虽然将HDD简单替换为SSD,不修改Ursa的任何代码、配置就可以运行,并取得性能上的显著改善,然而SSD在性能、成本、寿命等方面与HDD差异巨大,Ursa必须做出针对性的优化才能使SSD扬长避短。另一个重要方向是对大量VM同时启动的更好的支持。如果同时有上百台相同的VM从Ursa启动(即系统盘放在Ursa上),会在短时间内有大量读请求访问相同的数据集(启动文件),形成局部热点,此时相关的Chunk Server服务能力会出现不足,导致启动变慢。由于各个VM所需的启动数据基本相同,我们可以采用“一传十,十传百”的方式组织一个应用层组播树overlay网络来进行数据分发,这样可以从容应对热点数据访问问题。随着一步步的完善,相信Ursa将在云平台的运营当中起到越来越重要的作用。

原文出自:https://tech.meituan.com/archives

Spark在美团的实践

美团点评阅读(2645)

本文已发表在《程序员》杂志2016年4月期。

前言

美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持。大数据处理渗透到了美团各业务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率。
美团最初的数据处理以Hive SQL为主,底层计算引擎为MapReduce,部分相对复杂的业务会由工程师编写MapReduce程序实现。随着业务的发展,单纯的Hive SQL查询或者MapReduce程序已经越来越难以满足数据处理和分析的需求。
一方面,MapReduce计算模型对多轮迭代的DAG作业支持不给力,每轮迭代都需要将数据落盘,极大地影响了作业执行效率,另外只提供Map和Reduce这两种计算因子,使得用户在实现迭代式计算(比如:机器学习算法)时成本高且效率低。
另一方面,在数据仓库的按天生产中,由于某些原始日志是半结构化或者非结构化数据,因此,对其进行清洗和转换操作时,需要结合SQL查询以及复杂的过程式逻辑处理,这部分工作之前是由Hive SQL结合Python脚本来完成。这种方式存在效率问题,当数据量比较大的时候,流程的运行时间较长,这些ETL流程通常处于比较上游的位置,会直接影响到一系列下游的完成时间以及各种重要数据报表的生成。
基于以上原因,美团在2014年的时候引入了Spark。为了充分利用现有Hadoop集群的资源,我们采用了Spark on Yarn模式,所有的Spark app以及MapReduce作业会通过Yarn统一调度执行。Spark在美团数据平台架构中的位置如图所示:

 离线计算平台架构图

经过近两年的推广和发展,从最开始只有少数团队尝试用Spark解决数据处理、机器学习等问题,到现在已经覆盖了美团各大业务线的各种应用场景。从上游的ETL生产,到下游的SQL查询分析以及机器学习等,Spark正在逐步替代MapReduce作业,成为美团大数据处理的主流计算引擎。目前美团Hadoop集群用户每天提交的Spark作业数和MapReduce作业数比例为4:1,对于一些上游的Hive ETL流程,迁移到Spark之后,在相同的资源使用情况下,作业执行速度提升了十倍,极大地提升了业务方的生产效率。
下面我们将介绍Spark在美团的实践,包括我们基于Spark所做的平台化工作以及Spark在生产环境下的应用案例。其中包含Zeppelin结合的交互式开发平台,也有使用Spark任务完成的ETL数据转换工具,数据挖掘组基于Spark开发了特征平台和数据挖掘平台,另外还有基于Spark的交互式用户行为分析系统以及在SEM投放服务中的应用,以下是详细介绍。

Spark交互式开发平台

在推广如何使用Spark的过程中,我们总结了用户开发应用的主要需求:

  1. 数据调研:在正式开发程序之前,首先需要认识待处理的业务数据,包括:数据格式,类型(若以表结构存储则对应到字段类型)、存储方式、有无脏数据,甚至分析根据业务逻辑实现是否可能存在数据倾斜等等。这个需求十分基础且重要,只有对数据有充分的掌控,才能写出高效的Spark代码;
  2. 代码调试:业务的编码实现很难保证一蹴而就,可能需要不断地调试;如果每次少量的修改,测试代码都需要经过编译、打包、提交线上,会对用户的开发效率影响是非常大的;
  3. 联合开发:对于一整个业务的实现,一般会有多方的协作,这时候需要能有一个方便的代码和执行结果共享的途径,用于分享各自的想法和试验结论。

基于这些需求,我们调研了现有的开源系统,最终选择了Apache的孵化项目Zeppelin,将其作为基于Spark的交互式开发平台。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了数据分析和可视化等功能。

 Zeppelin实例

我们在原生的Zeppelin上增加了用户登陆认证、用户行为日志审计、权限管理以及执行Spark作业资源隔离,打造了一个美团的Spark的交互式开发平台,不同的用户可以在该平台上调研数据、调试程序、共享代码和结论。

集成在Zeppelin的Spark提供了三种解释器:Spark、Pyspark、SQL,分别适用于编写Scala、Python、SQL代码。对于上述的数据调研需求,无论是程序设计之初,还是编码实现过程中,当需要检索数据信息时,通过Zeppelin提供的SQL接口可以很便利的获取到分析结果;另外,Zeppelin中Scala和Python解释器自身的交互式特性满足了用户对Spark和Pyspark分步调试的需求,同时由于Zeppelin可以直接连接线上集群,因此可以满足用户对线上数据的读写处理请求;最后,Zeppelin使用Web Socket通信,用户只需要简单地发送要分享内容所在的http链接,所有接受者就可以同步感知代码修改,运行结果等,实现多个开发者协同工作。

Spark作业ETL模板

除了提供平台化的工具以外,我们也会从其他方面来提高用户的开发效率,比如将类似的需求进行封装,提供一个统一的ETL模板,让用户可以很方便的使用Spark实现业务需求。

美团目前的数据生产主体是通过ETL将原始的日志通过清洗、转换等步骤后加载到Hive表中。而很多线上业务需要将Hive表里面的数据以一定的规则组成键值对,导入到Tair中,用于上层应用快速访问。其中大部分的需求逻辑相同,即把Hive表中几个指定字段的值按一定的规则拼接成key值,另外几个字段的值以json字符串的形式作为value值,最后将得到的对写入Tair。

 hive2Tair流程示例

由于Hive表中的数据量一般较大,使用单机程序读取数据和写入Tair效率比较低,因此部分业务方决定使用Spark来实现这套逻辑。最初由业务方的工程师各自用Spark程序实现从Hive读数据,写入到Tair中(以下简称hive2Tair流程),这种情况下存在如下问题:
每个业务方都要自己实现一套逻辑类似的流程,产生大量重复的开发工作;
由于Spark是分布式的计算引擎,因此代码实现和参数设置不当很容易对Tair集群造成巨大压力,影响Tair的正常服务。
基于以上原因,我们开发了Spark版的hive2Tair流程,并将其封装成一个标准的ETL模板,其格式和内容如下所示:

 hive2Tair  ETL模版

source用于指定Hive表源数据,target指定目标Tair的库和表,这两个参数可以用于调度系统解析该ETL的上下游依赖关系,从而很方便地加入到现有的ETL生产体系中。

有了这个模板,用户只需要填写一些基本的信息(包括Hive表来源,组成key的字段列表,组成value的字段列表,目标Tair集群)即可生成一个hive2Tair的ETL流程。整个流程生成过程不需要任何Spark基础,也不需要做任何的代码开发,极大地降低了用户的使用门槛,避免了重复开发,提高了开发效率。该流程执行时会自动生成一个Spark作业,以相对保守的参数运行:默认开启动态资源分配,每个Executor核数为2,内存2GB,最大Executor数设置为100。如果对于性能有很高的要求,并且申请的Tair集群比较大,那么可以使用一些调优参数来提升写入的性能。目前我们仅对用户暴露了设置Executor数量以及每个Executor内存的接口,并且设置了一个相对安全的最大值规定,避免由于参数设置不合理给Hadoop集群以及Tair集群造成异常压力。

基于Spark的用户特征平台

在没有特征平台之前,各个数据挖掘人员按照各自项目的需求提取用户特征数据,主要是通过美团的ETL调度平台按月/天来完成数据的提取。

但从用户特征来看,其实会有很多的重复工作,不同的项目需要的用户特征其实有很多是一样的,为了减少冗余的提取工作,也为了节省计算资源,建立特征平台的需求随之诞生,特征平台只需要聚合各个开发人员已经提取的特征数据,并提供给其他人使用。特征平台主要使用Spark的批处理功能来完成数据的提取和聚合。
开发人员提取特征主要还是通过ETL来完成,有些数据使用Spark来处理,比如用户搜索关键词的统计。
开发人员提供的特征数据,需要按照平台提供的配置文件格式添加到特征库,比如在图团购的配置文件中,团购业务中有一个用户24小时时段支付的次数特征,输入就是一个生成好的特征表,开发人员通过测试验证无误之后,即完成了数据上线;另外对于有些特征,只需要从现有的表中提取部分特征数据,开发人员也只需要简单的配置即可完成。

 特征平台 数据流向图

在图中,我们可以看到特征聚合分两层,第一层是各个业务数据内部聚合,比如团购的数据配置文件中会有很多的团购特征、购买、浏览等分散在不同的表中,每个业务都会有独立的Spark任务来完成聚合,构成一个用户团购特征表;特征聚合是一个典型的join任务,对比MapReduce性能提升了10倍左右。第二层是把各个业务表数据再进行一次聚合,生成最终的用户特征数据表。
特征库中的特征是可视化的,我们在聚合特征时就会统计特征覆盖的人数,特征的最大最小数值等,然后同步到RDB,这样管理人员和开发者都能通过可视化来直观地了解特征。 另外,我们还提供特征监测和告警,使用最近7天的特征统计数据,对比各个特征昨天和今天的覆盖人数,是增多了还是减少了,比如性别为女这个特征的覆盖人数,如果发现今天的覆盖人数比昨天低了1%(比如昨天6亿用户,女性2亿,那么人数降低了1%*2亿=2百万)突然减少2万女性用户说明数据出现了极大的异常,何况网站的用户数每天都是增长的。这些异常都会通过邮件发送到平台和特征提取的相关人。

Spark数据挖掘平台

数据挖掘平台是完全依赖于用户特征库的,通过特征库提供用户特征,数据挖掘平台对特征进行转换并统一格式输出,就此开发人员可以快速完成模型的开发和迭代,之前需要两周开发一个模型,现在短则需要几个小时,多则几天就能完成。特征的转换包括特征名称的编码,也包括特征值的平滑和归一化,平台也提供特征离散化和特征选择的功能,这些都是使用Spark离线完成。

开发人员拿到训练样本之后,可以使用Spark mllib或者Python sklearn等完成模型训练,得到最优化模型之后,将模型保存为平台定义好的模型存储格式,并提供相关配置参数,通过平台即可完成模型上线,模型可以按天或者按周进行调度。当然如果模型需要重新训练或者其它调整,那么开发者还可以把模型下线。不只如此,平台还提供了一个模型准确率告警的功能,每次模型在预测完成之后,会计算用户提供的样本中预测的准确率,并比较开发者提供的准确率告警阈值,如果低于阈值则发邮件通知开发者,是否需要对模型重新训练。

在开发挖掘平台的模型预测功时能我们走了点弯路,平台的模型预测功能开始是兼容Spark接口的,也就是使用Spark保存和加载模型文件并预测,使用过的人知道Spark mllib的很多API都是私有的开发人员无法直接使用,所以我们这些接口进行封装然后再提供给开发者使用,但也只解决了Spark开发人员的问题,平台还需要兼容其他平台的模型输出和加载以及预测的功能,这让我们面临必需维护一个模型多个接口的问题,开发和维护成本都较高,最后还是放弃了兼容Spark接口的实现方式,我们自己定义了模型的保存格式,以及模型加载和模型预测的功能。

 数据挖掘平台结构图

以上内容介绍了美团基于Spark所做的平台化工作,这些平台和工具是面向全公司所有业务线服务的,旨在避免各团队做无意义的重复性工作,以及提高公司整体的数据生产效率。目前看来效果是比较好的,这些平台和工具在公司内部得到了广泛的认可和应用,当然也有不少的建议,推动我们持续地优化。
随着Spark的发展和推广,从上游的ETL到下游的日常数据统计分析、推荐和搜索系统,越来越多的业务线开始尝试使用Spark进行各种复杂的数据处理和分析工作。下面将以Spark在交互式用户行为分析系统以及SEM投放服务为例,介绍Spark在美团实际业务生产环境下的应用。

Spark在交互式用户行为分析系统中的实践

美团的交互式用户行为分析系统,用于提供对海量的流量数据进行交互式分析的功能,系统的主要用户为公司内部的PM和运营人员。普通的BI类报表系统,只能够提供对聚合后的指标进行查询,比如PV、UV等相关指标。但是PM以及运营人员除了查看一些聚合指标以外,还需要根据自己的需求去分析某一类用户的流量数据,进而了解各种用户群体在App上的行为轨迹。根据这些数据,PM可以优化产品设计,运营人员可以为自己的运营工作提供数据支持,用户核心的几个诉求包括:

  1. 自助查询,不同的PM或运营人员可能随时需要执行各种各样的分析功能,因此系统需要支持用户自助使用。
  2. 响应速度,大部分分析功能都必须在几分钟内完成。
  3. 可视化,可以通过可视化的方式查看分析结果。

要解决上面的几个问题,技术人员需要解决以下两个核心问题:

  1. 海量数据的处理,用户的流量数据全部存储在Hive中,数据量非常庞大,每天的数据量都在数十亿的规模。
  2. 快速计算结果,系统需要能够随时接收用户提交的分析任务,并在几分钟之内计算出他们想要的结果。

要解决上面两个问题,目前可供选择的技术主要有两种:MapReduce和Spark。在初期架构中选择了使用MapReduce这种较为成熟的技术,但是通过测试发现,基于MapReduce开发的复杂分析任务需要数小时才能完成,这会造成极差的用户体验,用户无法接受。

因此我们尝试使用Spark这种内存式的快速大数据计算引擎作为系统架构中的核心部分,主要使用了Spark Core以及Spark SQL两个组件,来实现各种复杂的业务逻辑。实践中发现,虽然Spark的性能非常优秀,但是在目前的发展阶段中,还是或多或少会有一些性能以及OOM方面的问题。因此在项目的开发过程中,对大量Spark作业进行了各种各样的性能调优,包括算子调优、参数调优、shuffle调优以及数据倾斜调优等,最终实现了所有Spark作业的执行时间都在数分钟左右。并且在实践中解决了一些shuffle以及数据倾斜导致的OOM问题,保证了系统的稳定性。

结合上述分析,最终的系统架构与工作流程如下所示:

  1. 用户在系统界面中选择某个分析功能对应的菜单,并进入对应的任务创建界面,然后选择筛选条件和任务参数,并提交任务。
  2. 由于系统需要满足不同类别的用户行为分析功能(目前系统中已经提供了十个以上分析功能),因此需要为每一种分析功能都开发一个Spark作业。
  3. 采用J2EE技术开发了Web服务作为后台系统,在接收到用户提交的任务之后,根据任务类型选择其对应的Spark作业,启动一条子线程来执行Spark-submit命令以提交Spark作业。
  4. Spark作业运行在Yarn集群上,并针对Hive中的海量数据进行计算,最终将计算结果写入数据库中。
  5. 用户通过系统界面查看任务分析结果,J2EE系统负责将数据库中的计算结果返回给界面进行展现。

 交互式用户行为分析系统架构

该系统上线后效果良好:90%的Spark作业运行时间都在5分钟以内,剩下10%的Spark作业运行时间在30分钟左右,该速度足以快速响应用户的分析需求。通过反馈来看,用户体验非常良好。目前每个月该系统都要执行数百个用户行为分析任务,有效并且快速地支持了PM和运营人员的各种分析需求。

Spark在SEM投放服务中的应用

流量技术组负责着美团站外广告的投放技术,目前在SEM、SEO、DSP等多种业务中大量使用了Spark平台,包括离线挖掘、模型训练、流数据处理等。美团SEM(搜索引擎营销)投放着上亿的关键词,一个关键词从被挖掘策略发现开始,就踏上了精彩的SEM之旅。它经过预估模型的筛选,投放到各大搜索引擎,可能因为市场竞争频繁调价,也可能因为效果不佳被迫下线。而这样的旅行,在美团每分钟都在发生。如此大规模的随机“迁徙”能够顺利进行,Spark功不可没。

 SEM服务架构图

Spark不止用于美团SEM的关键词挖掘、预估模型训练、投放效果统计等大家能想到的场景,还罕见地用于关键词的投放服务,这也是本段介绍的重点。一个快速稳定的投放系统是精准营销的基础。

美团早期的SEM投放服务采用的是单机版架构,随着关键词数量的极速增长,旧有服务存在的问题逐渐暴露。受限于各大搜索引擎API的配额(请求频次)、账户结构等规则,投放服务只负责处理API请求是远远不够的,还需要处理大量业务逻辑。单机程序在小数据量的情况下还能通过多进程勉强应对,但对于如此大规模的投放需求,就很难做到“兼顾全局”了。

新版SEM投放服务在15年Q2上线,内部开发代号为Medusa。在Spark平台上搭建的Medusa,全面发挥了Spark大数据处理的优势,提供了高性能高可用的分布式SEM投放服务,具有以下几个特性:

  1. 低门槛,Medusa整体架构的设计思路是提供数据库一样的服务。在接口层,让RD可以像操作本地数据库一样,通过SQL来“增删改查”线上关键词表,并且只需要关心自己的策略标签,不需要关注关键词的物理存储位置。Medusa利用Spark SQL作为服务的接口,提高了服务的易用性,也规范了数据存储,可同时对其他服务提供数据支持。基于Spark开发分布式投放系统,还可以让RD从系统层细节中解放出来,全部代码只有400行。
  2. 高性能、可伸缩,为了达到投放的“时间”、“空间”最优化,Medusa利用Spark预计算出每一个关键词在远程账户中的最佳存储位置,每一次API请求的最佳时间内容。在配额和账号容量有限的情况下,轻松掌控着亿级的在线关键词投放。通过控制Executor数量实现了投放性能的可扩展,并在实战中做到了全渠道4小时全量回滚。
  3. 高可用,有的同学或许会有疑问:API请求适合放到Spark中做吗?因为函数式编程要求函数是没有副作用的纯函数(输入是确定的,输出就是确定的)。这确实是一个问题,Medusa的思路是把请求API封装成独立的模块,让模块尽量做到“纯函数”的无副作用特性,并参考面向轨道编程的思路,将全部请求log重新返回给Spark继续处理,最终落到Hive,以此保证投放的成功率。为了更精准的控制配额消耗,Medusa没有引入单次请求重试机制,并制定了服务降级方案,以极低的数据丢失率,完整地记录了每一个关键词的旅行。

结论和展望

本文我们介绍了美团引入Spark的起源,基于Spark所做的一些平台化工作,以及Spark在美团具体应用场景下的实践。总体而言,Spark由于其灵活的编程接口、高效的内存计算,能够适用于大部分数据处理场景。在推广和使用Spark的过程中,我们踩过不少坑,也遇到过很多问题,但填坑和解决问题的过程,让我们对Spark有了更深入的理解,我们也期待着Spark在更多的应用场景中发挥重要的作用。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——基础篇

美团点评阅读(2675)

前言

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。

然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。

笔者根据之前的Spark作业开发经验以及实践积累,总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同学,主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。

本文作为Spark性能优化指南的基础篇,主要讲解开发调优以及资源调优。

开发调优

调优概述

Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

原则一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

一个简单的例子

// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。

// 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。
// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

一个简单的例子

// 错误的做法。

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正确的做法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

原则三:对多次使用的RDD进行持久化

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

对多次使用的RDD进行持久化的代码示例

// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

Spark的持久化级别

持久化级别含义解释
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

如何选择一种最合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

groupByKey实现wordcount原理

reduceByKey实现wordcount原理

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。

使用reduceByKey/aggregateByKey替代groupByKey

详情见“原则五:使用map-side预聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

使用foreachPartitions替代foreach

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

广播大变量的代码示例

// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原则八:使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

资源调优

调优概述

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

Spark作业基本运行原理

Spark基本运行原理

详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

资源参数调优

了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

写在最后的话

根据实践经验来看,大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后,一般都能以较高的性能运行了,足以满足我们的需求。但是在不同的生产环境和项目背景下,可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。为了应对这些挑战,需要使用更高级的技巧来处理这类问题。在后续的《Spark性能优化指南——高级篇》中,我们会详细讲解数据倾斜调优以及Shuffle调优。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——高级篇

美团点评阅读(3257)

前言

基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。

数据倾斜调优

调优概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

数据倾斜原理

如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

  • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

解决方案二:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

解决方案三:提高shuffle操作的并行度

方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

解决方案六:采样倾斜key并分拆join操作

方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join即可。
  • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

shuffle调优

调优概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

写在最后的话

本文分别讲解了开发过程中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。希望大家能够在阅读本文之后,记住这些性能调优的原则以及方案,在Spark作业开发、测试以及运行的过程中多尝试,只有这样,我们才能开发出更优的Spark作业,不断提升其性能。

原文出自:https://tech.meituan.com/archives

大众点评支付渠道网关系统的实践之路

美团点评阅读(2778)

业务的快速增长,要求系统在快速迭代的同时,保持很好的扩展性和可用性。其中,交易系统除了满足上述要求之外,还必须保持数据的强一致性。对系统开发人员而言,这既是机遇,也是挑战。本文主要梳理大众点评支付渠道网关系统在面对这些成长烦恼时的演进之路,以及过程中的一些思考和实践。

在整个系统的演进过程中,核心思路是:大系统做小,做简单(具体描述可参考《高可用性系统在大众点评的实践与经验》)。在渠道网关系统实践过程中,可以明显区分出几个有代表性的阶段。

一、能用阶段

早期业务流量还不是很大,渠道网关系统业务逻辑也很简单,一句话总结就是:让用户在交易的时候,能顺利把钱给付了。做的事情可简单概括成3件:发起支付请求、接收支付成功通知以及用户要求退款时原路退回给用户的支付账户。这个阶段系统实践比较简单,主要就是“短、平、快”,快速接入新的第三方支付渠道并保证能用。系统架构如图1。
能用阶段

二、可用阶段

在系统演进初期的快速迭代过程中,接入的第三方支付渠道不多,系统运行还算比较平稳,一些简单问题也可通过开发人员人工快速解决。但随着接入的第三方支付渠道不断增多,逐渐暴露出一些新的问题:

(1) 所有的业务逻辑都在同一个物理部署单元,不同业务之间互相影响(例如退款业务出现问题,但是与此同时把支付业务也拖垮了);

(2) 随着业务流量的增大,数据库的压力逐渐增大,数据库的偶尔波动造成系统不稳定,对用户的支付体验影响很大;

(3) 支付、退款等状态的同步很大程度上依赖第三方支付渠道的异步通知,一旦第三方支付渠道出现问题,造成大量客诉,用户体验很差,开发、运营都很被动。

针对(1)中的业务之间互相影响问题,我们首先考虑进行服务拆分,将之前一个大的物理部署单元拆成多个物理部署单元。有两种明显的可供选择的拆分策略:

  • 按照渠道拆分,不同的第三方支付渠道独立一个物理部署单元,例如微信一个部署单元,支付宝一个部署单元等;
  • 按照业务类型拆分,不同的业务独立一个物理部署单元,例如支付业务一个部署单元,退款业务一个部署单元等。

考虑到在当时的流量规模下,支付业务优先级最高,退款等业务的优先级要低;而有些渠道的流量占比很小,作为一个独立的部署单元,会造成一定的资源浪费,且增加了系统维护的复杂度。基于此,我们做了一个符合当时系统规模的trade-off:选择了第2种拆分策略 — 按照业务类型拆分。

针对(2)中的DB压力问题,我们和DBA一起分析原因,最终选择了Master-Slave方案。通过增加Slave来缓解查询压力;通过强制走Master来保证业务场景的强一致性;通过公司的DB中间件Zebra来做负载均衡和灾备切换,保证DB的高可用性。

针对(3)中的状态同步问题,我们对不同渠道进行梳理,在已有的第三方支付渠道异步通知的基础上,通过主动查询定时批量同步状态,解决了绝大部分状态同步问题。对于仍未同步的少量Case,系统开放出供内部使用的API,方便后台接入和开发人员手动补单。

在完成上述的实践之后,渠道网关系统已达到基本可用阶段,通过内部监控平台可以看到,核心服务接口可用性都能达到99.9%以上。演化之后的系统架构如图2。
可用阶段

三、柔性可用阶段

在解决了业务隔离、DB压力、状态同步等问题后,渠道网关系统度过一段稳定可用的时期。但架不住业务飞速增长的压力,之前业务流量规模下的一些小的系统波动、流量冲击等异常,在遭遇流量洪峰时被急剧放大,最终可能成为压垮系统的最后一根稻草。在新的业务流量规模下,我们面临着新的挑战:

(1) 随着团队的壮大,新加入的同学在接入新的渠道或者增加新的逻辑时,往往都会优先选用自己熟悉的方式完成任务。但熟悉的不一定是合理的,有可能会引入新的风险。特别是在与第三方渠道对接时,系统目前在使用的HTTP交互框架就有 JDK HttpURLConnection/HttpsURLConnection、Httpclient3.x、Httpclient4.x(4.x版本内部还分别有使用不同的小版本)。仅在这个上面就踩过好几次惨痛的坑。

(2) 在按业务类型进行服务拆分后,不同业务不再互相影响。但同一业务内部,之前流量规模小的时候,偶尔波动一次影响不大,现在流量增大后,不同渠道之间就开始互相影响。例如支付业务,对外统一提供分布式的支付API,所有渠道共享同一个服务RPC连接池,一旦某一个渠道的支付接口性能恶化,导致大量占用服务RPC连接,其他正常渠道的请求都无法进来;而故障渠道性能恶化直接导致用户无法通过该渠道支付成功,连锁反应导致用户多次重试,从而进一步导致恶化加剧,最终引起系统雪崩,拒绝服务,且重启后的服务还有可能被大量的故障渠道重试请求给再次击垮。

(3) 目前接入的第三方支付渠道,无论是第三方支付公司、银行或是其他外部支付机构,基本都是通过重定向或SDK的方式引导用户完成最终支付动作。在这条支付链路中,渠道网关系统只是在后端与第三方支付渠道进行交互(生成支付重定向URL或预支付凭证),且只能通过第三方支付渠道的异步通知或自己主动进行支付查询才能得知最终用户支付结果。一旦某个第三方支付渠道内部发生故障,渠道网关系统完全无法得知该支付链路已损坏,这对用户支付体验造成损害。

(4) 现有的渠道网关的DB,某些非渠道网关服务仍可直接访问,这对渠道网关系统的DB稳定性、DB容量规划等带来风险,进而影响渠道网关系统的可用性,内部戏称被戴了“绿帽子”。

(5) 对于退款链路,系统目前未针对退款异常case进行统一收集、整理并分类,且缺乏一个清晰的退款链路监控。这导致用户申请退款后,少量用户的退款请求最终未处理成功,用户发起客诉。同时由于缺乏监控,导致这种异常退款缺乏一个后续推进措施,极端情形下,引起用户二次客诉,极大损害用户体验和公司信誉度。

为最大程度解决问题(1)中描述的风险,在吸取踩坑的惨痛教训后,我们针对第三方渠道对接,收集并整理不同的应用场景,抽象出一套接入框架。接入框架定义了请求组装、请求执行、响应解析和错误重试这一整套网关交互流程,屏蔽了底层的HTTP或Socket交互细节,并提供相应的扩展点。针对银行渠道接入存在前置机这种特殊的应用场景,还基于Netty抽象出连接池(Conn Pool)和简单的负载均衡机制(LB, 提供Round Robin路由策略)。不同渠道在接入时可插入自定义的组装策略(扩展已有的HttpReq、HttpsReq或NettyReq),执行策略[扩展已有(Http、Https或Netty)Sender/Receiver],解析策略(扩展已有的HttpResp、HttpsResp或NettyResp),并复用框架已提供的内容解析(binary/xml/json parser)、证书加载(keystore/truststore loader)和加解密签名(encrypt/decrypt/sign/verify sign)组件,从而在达到提高渠道接入效率的同时,尽可能减少新渠道接入带来的风险。接入框架的流程结构如图3。
渠道接入框架

为解决问题(2)中渠道之间相互影响,一个简单直观的思路就是渠道隔离。如何隔离,隔离到什么程度?这是2个主要的问题点:

  • 如何隔离 考虑过将支付服务进一步按照渠道拆分,将系统继续做小,但是拆分后,支付API的调用端需要区分不同渠道调用不同的支付API接口,这相当于将渠道隔离问题抛给了调用端;同时拆分后服务增多,调用端需要维护同一渠道支付业务的多个不同RPC-API,复杂度提高,增加了开发人员的维护负担,这在当前的业务流量规模下不太可取。所以我们选择了在同一个支付服务API内部进行渠道隔离。由于共用同一个支付服务服务API连接池,渠道隔离的首要目标就是避免故障渠道大量占用AP连接池,对其他正常渠道造成株连影响。如果能够自动检测出故障渠道,并在其发生故障的初期阶段就快速失败该故障渠道的请求,则从业务逻辑上就自动完成了故障渠道的隔离。
  • 隔离到什么程度 一个支付渠道下存在不同的支付方式(信用卡支付、借记卡支付、余额支付等),而有些支付方式(例如信用卡支付)还存在多个银行。所以我们直接将渠道隔离的最小粒度定义到支付渠道 -> 支付方式 -> 银行。

基于上述的思考,我们设计并实现了一个针对故障渠道的快速失败(fail-fast)机制:

  • 将每一笔支付请求所附带的支付信息抽象为一个特定的fail-fast路径,请求抽象成一个fail-fast事务,请求成功即认为事务成功,反之,事务失败。
  • 在fail-fast事务执行过程中,级联有2个fail-fast断路开关:
    • 静态开关,根据人工配置(on/off),断定某个支付请求是否需快速失败。
    • 动态开关,根据历史统计信息,确定当前健康状态,进而断定是否快速失败当前支付请求。
  • 动态断路开关抽象了3种健康状态(closed-放行所有请求;half_open-部分比例的请求放行;open-快速失败所有请求),并依据历史统计信息(总请求量/请求失败量/请求异常量/请求超时量),在其内部维护了一个健康状态变迁的状态机。状态变迁如图4。
    FailFast状态变迁
  • 状态机的每一次状态变迁都会产生一个健康状态事件,收银台服务可以监听这个健康状态事件,实现支付渠道的联动上下线切换。
  • 每一笔支付请求结束后都会动态更新历史统计信息。

经过线上流量模拟压测观察,fail-fast机制给系统支付请求增加了1~5ms的额外耗时,相比第三方渠道的支付接口耗时,占比1%~2%,属于可控范围。渠道故障fail-fast机制上线之后,结合压测配置,经过几次微调,稳定了线上环境的fail-fast配置参数。

在前不久的某渠道支付故障时,通过公司内部的监控平台,明显观察到fail-fast机制起到很好的故障隔离效果,如下图5。
故障隔离效果

为解决问题(3)中支付链路可用性监测,依赖公司内部的监控平台上报,实时监控支付成功通知趋势曲线;同时渠道网关系统内部从业务层面自行实现了支付链路端到端的监控。秒级监控支付链路端到端支付成功总量及支付成功率,并基于这2个指标的历史统计信息,提供实时的支付链路邮件或短信报警。而在流量高峰时,该监控还可通过人工手动降级(异步化或关闭)。这在很大程度上提高了开发人员的核心支付链路故障响应速度。

为解决问题(4)中的“绿帽子”,渠道网关系统配合DBA回收所有外部系统的DB直接访问权限,提供替换的API以供外部系统访问,这给后续的提升DB稳定性、DB容量规划以及后续可能的异步多机房部署打下基础。

针对问题(5)中退款case,渠道网关系统配合退款链路上的其他交易、支付系统,从源头上对第三方渠道退款异常case进行统一收集、整理并分类,并形成退款链路核心指标(退款当日成功率/次日成功率/7日成功率)监控,该部分的系统实践会随着后续的“退款链路统一优化”一起进行分享;

随着上述实践的逐步完成,渠道网关系统的可用性得到显著提高,核心链路的API接口可用性达到99.99%,在公司的917大促中,渠道网关系统平稳度过流量高峰,并迎来了新的记录:提交第三方渠道支付请求的TPS达到历史新高。且在部分渠道接口发生故障时,能保证核心支付API接口的稳定性,并做到故障渠道的自动检测、恢复,实现收银台对应渠道的联动上下线切换。同时,通过核心支付链路支付成功率监控,实现第三方渠道内部故障时,渠道上下线的手动切换。至此,基本保证了在部分第三方渠道有损的情况下,渠道网关系统的柔性可用。演化后的此阶段系统架构如图6。
柔性可用阶段

四、经验与总结

在整个渠道网关系统一步步的完善过程中,踩过很多坑,吃过很多教训,几点小的收获:

  1. 坚持核心思想,拆分、解耦,大系统做小,做简单;
  2. 系统总会有出问题的时候,重要的是如何快速定位、恢复、解决问题,这是一个长期而又艰巨的任务;
  3. 高可用性的最大敌人不仅是技术,还是使用技术实现系统的人,如何在业务、系统快速迭代的过程中,保证自我驱动,不掉队;
  4. 高流量,大并发对每一个工程师既是挑战,更是机遇。

原文出自:https://tech.meituan.com/archives

专访美团外卖曹振团:天下武功唯快不破

美团点评阅读(2716)

本文转自InfoQ中文网站,首发地址:http://www.infoq.com/cn/news/2016/06/Meituan-take-away

马云曾经说过:世界是懒人创造出来的。在“懒人”们的推动下,O2O的战火已经燃烧到了外卖行业。据报告,2015年外卖市场年交易额已经达到450余亿元,其中美团外卖从2013年底上线以来,已经迅速超过了400万日订单。作为美团内部的创业项目,美团外卖是如何一步步从复用美团的基础服务,到如今具有在高并发下百万级别订单的业务系统的。

2016年7月15-16日,ArchSummit全球架构师峰会将在深圳举行。本届大会,我们邀请了美团外卖技术专家曹振团老师,前来分享《美团外卖系统架构演进与系统稳定性经验谈》的内容,讲述在美团外卖业务的高速发展中,系统架构各阶段面临的不同挑战以及对应的解决方案。

现在我们就来采访曹振团老师,让老师给大家送一份充满干货的“外卖”。

InfoQ:在你认为,怎样的架构是一个好架构,怎样的架构师是一个好的架构师?能否谈谈如何避免一个“坏”架构的诞生?

曹振团:一个好的架构是与业务特点及与业务发展阶段相结合的、因地制宜且与时俱进的。一个好的架构既能支撑业务的快速稳定发展,又能留有一定的灵活性和扩展性。而好的架构师需要做好三点:

  1. 首先需要充分理解业务,只有充分理解业务之后才能使架构不仅能够很好地支持业务特点,并具有一定的前瞻性。架构师需要站在推进业务发展的角度上合理地改进和优化架构设计,为业务的快速发展做好保障。
  2. 其次,架构师要具备良好的沟通和协调能力。架构师往往要面临着跨组、跨团队、跨事业部甚至跨事业群的一些技术方案,需要沟通和协调各方的诉求和冲突。
  3. 最后要具备持续学习的能力。不仅要学习技术知识的变化,更要学习业界的发展思路、最佳实践等。

好的架构是实践出来的,好的架构师也是在实践中快速成长起来的。新美大内部对架构师的培养非常注重实践和交流。架构师都在业务一线深入了解业务、需求及技术特性。美团外卖业务在快速发展的过程中踩了很多坑,这也让我们的架构师更快的成长起来。新美大内部各事业群的Casestudy都是共享的,这也是一个非常好的学习平台,从他人的经验和总结中学习。另外内部开设有架构专题分享,提供给大家一个架构实践的交流和讨论的平台。

架构通常是为了解决系统的高并发、高性能、高可用的问题,结合业务特点在研发资源、排期、技术方案之间做平衡。一个“坏”的架构则破坏了这种平衡,比如:由于工期紧张而引入了一个自己并不能把控的技术方案,为系统的稳定性埋下了雷。还有一个简单的判断标准是:当采用这个架构后在未来多长时间或几倍增量下需要调整架构。基本上要求至少在未来的半年到一年内或2倍增量下不需要调整架构。如果架构设计评审不符合这个标准就要及时重新设计或调整。

InfoQ:你曾在网易工作过,能否简单谈谈在架构设计上美团外卖与网易各自的特点是什么?

曹振团:门户网站面对着海量的新闻事件及分类,具有更新量大、过期快和热点集中的特点。新闻资料基本是静态的,所以对缓存技术应用较多。

美团外卖涉及到交易的信息、支付、履约的各个环节,参与方较多。从用户支付下单到商家接单,还有骑手配送等环节,这些过程都是动态有交互的,美团外卖对服务化及服务治理设计较多。

InfoQ:你加入美团后有参与多个创新业务的探索,那么你认为在业务初期好的架构是必要的吗,为什么呢?能否分享经历了多个创新业务之后的收获,这些经验给美团外卖带来了什么样的收获和挑战?

曹振团:2013年加入美团时,我们在O2O这个方向上进行了多个创新业务的探索,完全是新的事情,从零开始。在早期最重要的事情就是验证需求,验证产品是否能够满足用户的核心需求,是否能够被用户接受。这一阶段就是快速试错,通常以MVP的方式快速迭代。我们需要足够快地找到方向,此时灵活性优先于架构。

加入新美大的早期参与公司内部的创业,在O2O这个领域进行了多个创新业务的探索,最后决定做外卖这个业务。在经历创新业务后最大的收获主要讲两点:

  1. 创业的早期决定不做什么往往比决定做什么更重要。在早期产品技术资源都很紧缺,需要将有限的资源投入到最核心的产品需求上。
  2. 天下武功唯快不破。如今的市场瞬息万变,唯有快速的迭代,方能抢占先机。

创业会给技术带来大量的挑战:首先需要是个全面手,前端、后台、数据库、运维都能够迅速切入。另外也要有足够的技术视野和深度,才能够在技术方案的选择上游刃有余,既能快速的选择合适的方案,又能够把握其技术原理。

InfoQ:美团外卖创建初期是否借鉴了美团的架构设计?美团外卖架构的创建和发展经历了什么样的过程?

曹振团:美团外卖创建初期复用了很多美团的基础服务,主要借鉴了团购业务积累的经验。 美团外卖的架构是随着业务的发展优化和演进的。主要经历了以下几个过程:

  1. 自由发展阶段:业务起步的时候,大家公用服务和数据库表,这样能够快速支持产品迭代。产品和技术人员聚焦在快速验证产品功能上。到底应该有哪些功能,用户会如何点外卖等?这个阶段主要的特点就是集中,所有的功能都集中在几个项目里,所有的表都集中在一个库中。
  2. 故障驱动架构:随着业务的爆发增长,早期的架构出现了很多的问题,系统频繁地出现稳定性的问题,共享数据库表导致业务逻辑散落各地、甚至实现不一致的情况。这时系统稳定性问题倒逼架构进行优化调整,进行了服务化拆分,服务之间全部用接口的方式调用。
  3. 架构驱动改革:随着单量的快速增长,系统故障所造成的损失是巨大的、不可接受的。需要从架构驱动技术体系的改进、甚至推进产品和业务的变革。同时增加业务的容灾能力,进行了多机房的部署。

InfoQ:在外卖交易流程中,哪个环节最容易出现问题,你们是如何解决的?

曹振团:美团外卖在快速发展的过程中踩了很多坑。比如发版引发的故障,这类问题一般是最多的。发版的新功能可能含有bug,可能含有慢SQL,可能没有正确处理网络异常等。外卖业务还是一个实时且流程较长的业务,尤其流量比较集中在中午和晚上的饭点,在高峰时段的QPS是万级别的,外卖整个链条还会涉及到很多的服务,如果某个依赖服务有问题将会影响到其它服务。例如没有设置合理的RPC超时时间,如果服务A挂了,会导致依赖该服务的其它服务出现问题,进而引起连锁反应,最终系统雪崩。

针对这些问题,我们做了关键路径梳理。整个交易流程中梳理出关键路径,非关键调用异步化,关键路径做好容错容灾设计,实时监控关键路径的核心指标。整体系统做了比较全面的应急预案,当有紧急情况发生时,力保关键路径不出问题。

至于如何避免掉坑,分享几点:

  1. 防御式编程,不要相信任何人或服务,做好对自身服务的保护和对依赖服务的熔断。
  2. 每次发版都有预案。回滚是解决发版引发故障的最快捷有效的手段。
  3. 灰度!灰度!灰度!
  4. 完善的监控很重要,需要覆盖系统性能及业务指标,并对这些指标熟知和敏感。

InfoQ: 在美团外卖业务急速发展的同时,目前美团外卖架构上是否有需要改进和突破的地方?是否有关注前沿的技术,目前通过什么渠道来发现持续优化架构的方法?

曹振团: 业务屡创新高及团队规模的扩大,给技术架构带来了越来越多的挑战。如何能够在大规模作战的情况下,既能保持业务开发的敏捷性又能保障系统的稳定性,是我们架构中要重点思考的,我们一直在结合着业务发展的节奏,优化和改进技术架构。一方面要与时俱进地支持多品类的业务扩展及平台化建设;另一方面,在永恒不变的话题“稳定性”方面,在为能够支持更高的业务量,持续抽取和优化基础服务,加强中间件的能力,使各服务能够更聚焦和专注。因此加强基础设施和强固中间件的建设是我们要改进和突破的地方。夯实技术基础设施和中间件,让研发力量更聚焦在业务开发中。

美团外卖在技术架构中大量使用了开源的技术组件,在深入调研后会结合外卖的业务特点做二次开发,感谢Java社区里丰富的开源软件,使得我们能够更好地优化我们的架构。另外一个发现和持续优化架构的方法是全链路的在线压测。我们会定期进行全链路的在线压测,发现系统的瓶颈并优化验证。

InfoQ: 在架构演进过程中分别使用了哪些核心指标来判断公司架构是否改善?你们对核心指标的要求是多少,目前是否已经稳定达到?

曹振团:架构的演进是为了能够提供更可靠的SLA,核心的考量指标有:5倍的容量,4个9的可用性,TP99 200ms的响应时间,目前已经可以稳定达到了。这些指标界定了系统能够在一定的容量下具有稳定和可靠的表现。我们会定期地做全链路的在线压测,通过实时的数据指标监控系统分析和验证是否已经达成架构目标。可用性和响应时间在服务端还是比较好衡量和达成的,但是从用户端看就变得复杂了很多。比如用户的设备、网络环境等都可能会对结果造成影响,我们也在加强用户侧的监控,持续地优化和改进用户体验。

InfoQ: 外卖领域作为一种懒人经济,你们认为这些“懒人”在数据上体现了怎样的特征?

曹振团:有一种说法是“世界是由懒人驱动的”。由于懒得去换台,发明了遥控器。懒得爬楼梯,发明了电梯。如果你懒得出去吃,我们可以送外卖上门。懒是一个强需求。

我们在解决这个强需求的过程中,发现了一些有趣的变化:由于出现了这样的外卖平台,满足了大家懒的需求,越发使大家变“懒”了,平均点外卖的次数在变多,花样在变多(大家的口味在变多)。虽然变“懒”了,但是对品质的要求却在提高。我们通过优化配送链条上的服务,使得能够更及时、快速地为“懒人”提供美食。配送变快了,口感有了更好的保证。这又促使一些之前不提供外卖服务的品牌商家加入到外卖平台上来,使得整个生态更加良性发展。

另外一个特征是,“懒人”的需求也是多样的,“懒人”们希望能够提供更多的外送选择,这将会改写“外卖”的定义,它将会含有更广阔和丰富的含义,而不只是 “美食外卖”。

InfoQ: 作为架构师,有什么感悟和经验和大家分享吗?

曹振团: 主要分享几点吧:

  1. 性能是功能的一部分,稳定是功能的一部分。性能和稳定性需要在开发设计的时候作为产品功能的一部分来考虑,而不是扩展属性。这样才能在开发、测试、上线、运行中全程把握。
  2. 简单即美。把复杂的事情简单化,抓住核心脉络,解决好主要矛盾不见得一个大而全的海纳百川的架构才是好的,相反清晰、明了的架构是我们追求的,简单可依赖。
  3. 使用能够驾驭的技术。丰富的开源社区给了我们很多的选择和视角,任何引入的开源技术都应该理解其原理和本质,只有能够驾驭的技术才能用好。
  4. 细节决定成败。软件架构是一项复杂和精细的工作,只有把每个细节都做极致了,才能保障架构的基石稳健。

InfoQ:感谢曹振团老师接受我们的采访。期待你在ArchSummit全球架构师峰会上的分享。

原文出自:https://tech.meituan.com/archives

Cache应用中的服务过载案例研究

美团点评阅读(2649)

简单地说,过载是外部请求对系统的访问量突然激增,造成请求堆积,服务不可用,最终导致系统崩溃。本文主要分析引入Cache可能造成的服务过载,并讨论相关的预防、恢复策略。Cache在现代系统中使用广泛,由此引入的服务过载隐患无处不在,但却非常隐蔽,容易被忽视。本文希望能为开发者在设计和编写相关类型应用,以及服务过载发生处理时能够有章可循。

一个服务过载案例

本文讨论的案例是指存在正常调用关系的两个系统(假设调用方为A系统,服务方为B系统),A系统对B系统的访问突然超出B系统的承受能力,造成B系统崩溃。造成服务过载的原因很多,这里分析的是严重依赖Cache的系统服务过载。首先来看一种包含Cache的体系结构(如下图所示)。

A系统依赖B系统的读服务,A系统是60台机器组成的集群,B系统是6台机器组成的集群,之所以6台机器能够扛住60台机器的访问,是因为A系统并不是每次都访问B,而是首先请求Cache,只有Cache的相应数据失效时才会请求B。

这正是Cache存在的意义,它让B系统节省了大量机器;如果没有Cache,B系统不得不组成60台机器的集群,如果A也同时依赖除B系统外的另一个系统(假设为C系统)呢?那么C系统也要60台机器,放大的流量将很快耗尽公司的资源。

然而Cache的引入也不是十全十美的,这个结构中如果Cache发生问题,全部的流量将流向依赖方,造成流量激增,从而引发依赖系统的过载。

回到A和B的架构,造成服务过载的原因至少有下面三种:

  1. B系统的前置代理发生故障或者其他原因造成B系统暂时不可用,等B系统系统服务恢复时,其流量将远远超过正常值。
  2. Cache系统故障,A系统的流量将全部流到B系统,造成B系统过载。
  3. Cache故障恢复,但这时Cache为空,Cache瞬间命中率为0,相当于Cache被击穿,造成B系统过载。

第一个原因不太好理解,为什么B系统恢复后流量会猛增呢?主要原因就是缓存的超时时间。当有数据超时的时候,A系统会访问B系统,但是这时候B系统偏偏故障不可用,那么这个数据只好超时,等发现B系统恢复时,发现缓存里的B系统数据已经都超时了,都成了旧数据,这时当然所有的请求就打到了B。

下文主要介绍服务过载的预防和发生后的一些补救方法,以预防为主,从调用方和服务方的视角阐述一些可行方案。

服务过载的预防

所谓Client端指的就是上文结构中的A系统,相对于B系统,A系统就是B系统的Client,B系统相当于Server。

Client端的方案

针对上文阐述的造成服务过载的三个原因:B系统故障恢复、Cache故障、Cache故障恢复,我们看看A系统有哪些方案可以应对。

合理使用Cache应对B系统宕机

一般情况下,Cache的每个Key除了对应Value,还对应一个过期时间T,在T内,get操作直接在Cache中拿到Key对应Value并返回。但是在T到达时,get操作主要有五种模式:

1. 基于超时的简单(stupid)模式

在T到达后,任何线程get操作发现Cache中的Key和对应Value将被清除或标记为不可用,get操作将发起调用远程服务获取Key对应的Value,并更新写回Cache,然后get操作返回新值;如果远程获取Key-Value失败,则get抛出异常。

为了便于理解,举一个码头工人取货的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,这时5个工人各自分别去对岸取新货,然后返回。

2. 基于超时的常规模式

在T到达后,Cache中的Key和对应Value将被清除或标记为不可用,get操作将调用远程服务获取Key对应的Value,并更新写回Cache;此时,如果另一个线程发现Key和Value已经不可用,get操作还需要判断有没有其他线程发起了远程调用,如果有,那么自己就等待,直到那个线程远程获取操作成功,Cache中得Key变得可用,get操作返回新的Value。如果远程获取操作失败,则get操作抛出异常,不会返回任何Value。

还是码头工人的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,那么只需派出一个人去对岸取货,其他四个人在港口等待即可,而不用5个人全去。

基于超时的简单模式和常规模式区别在于对于同一个超时的Key,前者每个get线程一旦发现Key不存在,则发起远程调用获取值;而后者每个get线程发现Key不存在,则还要判断当前是否有其他线程已经发起了远程调用操作获取新值,如果有,自己就简单的等待即可。

显然基于超时的常规模式比基于超时的简单模式更加优化,减少了超时时并发访问后端的调用量。

实现基于超时的常规模式就需要用到经典的Double-checked locking惯用法了。

3. 基于刷新的简单(stupid)模式

在T到达后,Cache中的Key和相应Value不动,但是如果有线程调用get操作,将触发refresh操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:任何线程发现Key过期,都触发一次refresh操作,get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value。注意refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。
  • 异步模式:任何线程发现Key过期,都触发一次refresh操作,get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。

举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 5个人各自去远程取新货,如果取货失败,则拿着旧货返回(同步模式)
  • 5个人各自通知5个雇佣工去取新货,5个工人拿着旧货先回(异步模式)

4. 基于刷新的常规模式

在T到达后,Cache中的Key和相应Value都不会被清除,而是被标记为旧数据,如果有线程调用get操作,将触发refresh更新操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value,注意:refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。如果其他线程进行get操作,Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。
  • 异步模式:get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。如果其他线程进行get操作,发现Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。

再举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 派一个人去远方港口取新货,其余4个人拿着旧货先回(同步模式)。
  • 5个人通知一个雇佣工去远方取新货,5个人都拿着旧货先回(异步模式)。

基于刷新的简单模式和基于刷新的常规模式区别就在于取数线程之间能否感知当前数据是否正处在刷新状态,因为基于刷新的简单模式中取数线程无法感知当前过期数据是否正处在刷新状态,所以每个取数线程都会触发一个刷新操作,造成一定的线程资源浪费。

而基于超时的常规模式和基于刷新的常规模式区别在于前者过期数据将不能对外访问,所以一旦数据过期,各线程要么拿到数据,要么抛出异常;后者过期数据可以对外访问,所以一旦数据过期,各线程要么拿到新数据,要么拿到旧数据。

5. 基于刷新的续费模式

该模式和基于刷新的常规模式唯一的区别在于refresh操作超时或失败的处理上。在基于刷新的常规模式中,refresh操作超时或失败时抛出异常,Cache中的相应Key-Value还是旧值,这样下一个get操作到来时又会触发一次refresh操作。

在基于刷新的续费模式中,如果refresh操作失败,那么refresh将把旧值当成新值返回,这样就相当于旧值又被续费了T时间,后续T时间内get操作将取到这个续费的旧值而不会触发refresh操作。

基于刷新的续费模式也像常规模式那样分为同步模式和异步模式,不再赘述。

下面讨论这5种Cache get模式在服务过载发生时的表现,首先假设如下:

  • 假设A系统的访问量为每分钟M次。
  • 假设Cache能存Key为C个,并且Key空间有N个。
  • 假设正常状态下,B系统访问量为每分钟W次,显然W\<\<M。

这时因为某种原因,比如B长时间故障,造成Cache中得Key全部过期,B系统这时从故障中恢复,五种get模式分析表现分析如下:

  1. 在基于超时和刷新的简单模式中,B系统的瞬间流量将达到和A的瞬时流量M大体等同,相当于Cache被击穿。这就发生了服务过载,这时刚刚恢复的B系统将肯定会被大流量压垮。
  2. 在基于超时和刷新的常规模式中,B系统的瞬间流量将和Cache中Key空间N大体等同。这时是否发生服务过载,就要看Key空间N是否超过B系统的流量上限了。
  3. 在基于刷新的续费模式中,B系统的瞬间流量为W,和正常情况相同而不会发生服务过载。实际上,在基于刷新的续费模式中,不存在Cache Key全部过期的情况,就算把B系统永久性地干掉,A系统的Cache也会基于旧值长久的平稳运行。

第3点,B系统不会发生服务过载的主要原因是基于刷新的续费模式下不会出现chache中的Key全部长时间过期的情况,即使B系统长时间不可用,基于刷新的续费模式也会在一个过期周期内把旧值当成新值继续使用。所以当B系统恢复时,A系统的Cache都处在正常工作状态。

从B系统的角度看,能够抵抗服务过载的基于刷新的续费模式最优。

从A系统的角度看,由于一般情况下A系统是一个高访问量的在线web应用,这种应用最讨厌的一个词就是“线程等待”,因此基于刷新的各种异步模式较优。

综合考虑,基于刷新的异步续费模式是首选

然而凡是有利就有弊,有两点需要注意的地方:

  1. 基于刷新模式最大的缺点是Key-Value一旦放入Cache就不会被清除,每次更新也是新值覆盖旧值,JVM GC永远无法对其进行垃圾收集,而基于超时的模式中,Key-Value超时后如果新的访问没有到来,内存是可以被GC垃圾回收的。所以如果你使用的是寸土寸金的本地内存做Cache就要小心了。
  2. 基于刷新的续费模式需要做好监控,不然有可能Cache中的值已经和真实的值相差很远了,应用还以为是新值而使用。

关于具体的Cache,来自Google的Guava本地缓存库支持上文的第二种、第四种和第五种get操作模式。

但是对于Redis等分布式缓存,只提供原始的get、set方法,而提供的get仅仅是获取,与上文提到的五种get操作模式不是一个概念。开发者想用这五种get操作模式的话不得不自己封装和实现。

五种get操作模式中,基于超时和刷新的简单模式是实现起来最简单的模式,但遗憾的是这两种模式对服务过载完全无免疫力,这可能也是服务过载在大量依赖缓存的系统中频繁发生的一个重要原因吧。

本文之所以把第1、3种模式称为stupid模式,是想强调这种模式应该尽量避免,Guava里面根本没有这种模式,而Redis只提供简单的读写操作,很容易就把系统实现成了这种方式。

应对分布式Cache宕机

如果是Cache直接挂了,那么就算是基于刷新的异步续费模式也无能为力了。这时A系统铁定无法对Cache进行存取操作,只能将流量完全打到B系统,B系统面对服务过载在劫难逃……

本节讨论的预防Cache宕机仅限于分布式Cache,因为本地Cache一般和A系统应用共享内存和进程,本地Cache挂了A系统也挂了,不会出现本地Cache挂了而A系统应用正常的情况。

首先,A系统请求线程检查分布式Cache状态,如果无应答则说明分布式Cache挂了,则转向请求B系统,这样一来大流量将压垮B系统。这时可选的方案如下:

  1. A系统的当前线程不请求B系统,而是打个日志并设置一个默认值。
  2. A系统的当前线程按照一定概率决定是否请求B系统。
  3. A系统的当前线程检查B系统运行情况,如果良好则请求B系统。

方案1最简单,A系统知道如果没有Cache,B系统可能扛不住自己的全部流量,索性不请求B系统,等待Cache恢复。但这时B系统利用率为0,显然不是最优方案,而且当请求的Value不容易设置默认值时,这个方案就不行了。

方案2可以让一部分线程请求B系统,这部分请求肯定能被B系统hold住。可以保守的设置这个概率 u =(B系统的平均流量)/(A系统的峰值流量)

方案3是一种更为智能的方案,如果B系统运行良好,当前线程请求;如果B系统过载,则不请求,这样A系统将让B系统处于一种宕机与不宕机的临界状态,最大限度挖掘B系统性能。这种方案要求B系统提供一个性能评估接口返回Yes和No,Yes表示B系统良好,可以请求;No表示B系统情况不妙,不要请求。这个接口将被频繁调用,必须高效。

方案3的关键在于如何评估一个系统的运行状况。一个系统中当前主机的性能参数有CPU负载、内存使用率、Swap使用率、GC频率和GC时间、各个接口平均响应时间等,性能评估接口需要根据这些参数返回Yes或者No,是不是机器学习里的二分类问题?😄关于这个问题已经可以单独写篇文章讨论了,在这里就不展开了,你可以想一个比较简单傻瓜的保守策略,缺点是A系统的请求无法很好的逼近B系统的性能极限。

综合以上分析,方案2比较靠谱。如果选择方案3,建议由专门团队负责研究并提供统一的系统性能实时评估方案和工具。

应对分布式Cache宕机后的恢复

不要以为成功hold住分布式Cache宕机就万事大吉了,真正的考验是分布式Cache从宕机过程恢复之后,这时分布式Cache中什么都没有。

即使是上文中提到了基于刷新的异步续费策略这时也没用,因为分布式Cache为空,无论如何都要请求B系统。这时B系统的最大流量是Key的空间取值数量。

如果Key的取值空间数量很少,则相安无事;如果Key的取值空间数量大于B系统的流量上限,服务过载依然在所难免。

这种情况A系统很难处理,关键原因是A系统请求Cache返回Key对应Value为空,A系统无法知道是因为当前Cache是刚刚初始化,所有内容都为空;还是因为仅仅是自己请求的那个Key没在Cache里

如果是前者,那么当前线程就要像处理Cache宕机那样进行某种策略的回避;如果是后者,直接请求B系统即可,因为这是正常的Cache使用流程。

对于Cache宕机的恢复,A系统真的无能为力,只能寄希望于B系统的方案了。

Server端的方案

相对于Client端需要应对各种复杂问题,Server端需要应对的问题非常简单,就是如何从容应对过载的问题。无论是缓存击穿也好,还是拒绝服务攻击也罢,对于Server端来说都是过载保护的问题。对于过载保护,主要给出两种可行方案,以及一种比较复杂的方案思路。

流量控制

流量控制就是B系统实时监控当前流量,如果超过预设的值或者系统承受能力,则直接拒绝掉一部分请求,以实现对系统的保护。

流量控制根据基于的数据不同,可分为两种:

  1. 基于流量阈值的流控:流量阈值是每个主机的流量上限,流量超过该阈值主机将进入不稳定状态。阈值提前进行设定,如果主机当前流量超过阈值,则拒绝掉一部分流量,使得实际被处理流量始终低于阈值。
  2. 基于主机状态的流控:每个接受每个请求之前先判断当前主机状态,如果主机状况不佳,则拒绝当前请求。

基于阈值的流控实现简单,但是最大的问题是需要提前设置阈值,而且随着业务逻辑越来越复杂,接口越来越多,主机的服务能力实际应该是下降的,这样就需要不断下调阈值,增加了维护成本,而且万一忘记调整的话,呵呵……

主机的阈值可以通过压力测试确定,选择的时候可以保守些。

基于主机状态的流控免去了人为控制,但是其最大的确定上文已经提到:如何根据当前主机各个参数判断主机状态呢?想要完美的回答这个问题目测并不容易,因此在没有太好答案之前,我推荐基于阈值的流控。

流量控制基于实现位置的不同,又可以分为两种:

  1. 反向代理实现流控:在反向代理如Nginx上基于各种策略进行流量控制。这种一般针对HTTP服务。
  2. 借助服务治理系统:如果Server端是RMI、RPC等服务,可以构建专门的服务治理系统进行负载均衡、流控等服务。
  3. 服务容器实现流控:在应用代码里,业务逻辑之前实现流量控制。

第3种在服务器的容器(如Java容器)中实现流控并不推荐,因为流控和业务代码混在一起容易混乱;其次实际上流量已经全量进入到了业务代码里,这时的流控只是阻止其进入真正的业务逻辑,所以流控效果将打折;还有,如果流量策略经常变动,系统将不得不为此经常更改。

因此,推荐前两种方式。

最后提一个注意点:当因为流控而拒绝请求时,务必在返回的数据中带上相关信息(比如“当前请求因为超出流量而被禁止访问”),如果返回值什么都没有将是一个大坑。因为造成调用方请求没有被响应的原因很多,可能是调用方Bug,也可能是服务方Bug,还可能是网络不稳定,这样一来很可能在排查一整天后发现是流控搞的鬼……

服务降级

服务降级一般由人为触发,属于服务过载造成崩溃恢复时的策略,但为了和流控对比,将其放到这里。

流量控制本质上是减小访问量,而服务处理能力不变;而服务降级本质上是降低了部分服务的处理能力,增强另一部分服务处理能力,而访问量不变。

服务降级是指在服务过载时关闭不重要的接口(直接拒绝处理请求),而保留重要的接口。比如服务由10个接口,服务降级时关闭了其中五个,保留五个,这时这个主机的服务处理能力将增强到二倍左右。

然而,服务过载发生时动辄就超出系统处理能力10倍,而服务降级能使主机服务处理能力提高10倍么?显然很困难,因此服务过载的应对不能只依靠服务降级策略。

动态扩展

动态扩展指的是在流量超过系统服务能力时,自动触发集群扩容,自动部署并上线运行;当流量过去后又自动回收多余机器,完全弹性。

这个方案是不是感觉很不错。但是目前互联网公司的在线应用跑在云上的本身就不多,要完全实现在线应用的自动化弹性运维,要走的路就更多了。

崩溃恢复

如果服务过载造成系统崩溃还是不幸发生了,这时需要运维控制流量,等后台系统启动完毕后循序渐进的放开流量,主要目的是让Cache慢慢预热。流量控制刚开始可以为10%,然后20%,然后50%,然后80%,最后全量,当然具体的比例,尤其是初始比例,还要看后端承受能力和前端流量的比例,各个系统并不相同。

如果后端系统有专门的工具进行Cache预热,则省去了运维的工作,等Cache热起来再发布后台系统即可。但是如果Cache中的Key空间很大,开发预热工具将比较困难。

结论

“防患于未然”放在服务过载的应对上也是适合的,预防为主,补救为辅。综合上文分析,具体的预防要点如下:

  1. 调用方(A系统)采用基于刷新的异步续费模式使用Cache,或者至少不能使用基于超时或刷新的简单(stupid)模式。
  2. 调用方(A系统)每次请求Cache时检查Cache是否可用(available),如果不可用则按照一个保守的概率访问后端,而不是无所顾忌的直接访问后端。
  3. 服务方(B系统)在反向代理处设置流量控制进行过载保护,阈值需要通过压测获得。

崩溃的补救主要还是靠运维和研发在发生时的通力合作:观察流量变化准确定位崩溃原因,运维控流量研发持续关注性能变化。

未来如果有条件的话可以研究下主机应用健康判断问题和动态弹性运维问题,毕竟自动化比人为操作要靠谱。

原文出自:https://tech.meituan.com/archives

消息队列设计精要

美团点评阅读(2665)

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。
本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。
本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。
也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。
比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。
而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。
再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。
  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
    回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
    整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
    具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
    broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
    我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。
像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
    掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。
简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。
市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。
消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。
当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

  1. 发送关系的维护。
  2. 发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。
具体来说:

  1. producer往broker发送消息之前,需要做一次落地。
  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。
  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。
重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。
Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。
最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,
消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。
但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

  1. 允许消息丢失。
  2. 从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。
谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。
  2. 一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就
能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。
  2. 状态机。
版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。
如果到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储
就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。
就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。
举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。
我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。
减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。
  2. 本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。
那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。
然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。
而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。
本地事务存在两个最大的使用障碍:

  1. 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
  2. 对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心
的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。
服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。
整个过程可以参考下面的代码:

客户端同步服务端异步。

Future<Result> future = request(server);//server立刻返回future
synchronized(future){
while(!future.isDone()){
   future.wait();//server处理结束后会notify这个future,并修改isdone标志
}
}
return future.get();

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;

客户端异步服务端异步。

Future<Result> future = request(server);//server立刻返回future

return future

上面说了这么多,其实是想让大家脱离两个误区:

  1. RPC只有客户端能做异步,服务端不能。
  2. 异步只能通过线程池。

那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并
(当然接口可以设计成batch的,但可能batch发过来的仍然数量较少)。而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。
来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。
回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端如果使用异步模型,则可能因消息合并带来一定程度上的消息延迟),所以可以先使用线程池提交一个发送请求,主流程继续往下走。
但是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。所以这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。
总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。
  2. 到达了一定时间。
  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push还是pull

上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。
我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。
但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,
可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?
当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。
即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。
在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。
反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。
  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~
Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。

总结

本文从为何使用消息队列开始讲起,然后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些,让大家对消息队列有个提纲挈领的整体认识,并给自主开发消息队列提供思路。另外,本文主要是源自自己在开发消息队列中的思考和读源码时的体会,比较不”官方”,也难免会存在一些漏洞,欢迎大家多多交流。

后续我们还会推出消息队列设计高级篇,内容会涵盖以下方面:

  • pull模型消息系统设计理念
  • 存储子系统设计
  • 流量控制
  • 公平调度

敬请期待哦~

作者简介

王烨,现在是美团旅游后台研发组的程序猿,之前曾经在百度、去哪和优酷工作过,专注Java后台开发。对于网络编程和并发编程具有浓厚的兴趣,曾经做过一些基础组件,也翻过一些源码,属于比较典型的宅男技术控。期待能够与更多知己,在coding的路上并肩前行~

原文出自:https://tech.meituan.com/archives

美团外卖订单中心的演进

美团点评阅读(2711)

前言

美团外卖从2013年9月成交第一单以来,已走过了三个年头。期间,业务飞速发展,美团外卖由日均几单发展为日均500万单(9月11日已突破600万)的大型O2O互联网外卖服务平台。平台支持的品类也由最初外卖单品拓展为全品类。

随着订单量的增长、业务复杂度的提升,外卖订单系统也在不断演变进化,从早期一个订单业务模块到现在分布式可扩展的高性能、高可用、高稳定订单系统。整个发展过程中,订单系统经历了几个明显的阶段,下面本篇文章将为大家介绍一下订单系统的演进过程,重点关注各阶段的业务特征、挑战及应对之道。

为方便大家更好地了解整个演进过程,我们首先看一下外卖业务。

外卖订单业务

外卖订单业务是一个需要即时送的业务,对实时性要求很高。从用户订餐到最终送达用户,一般在1小时内。如果最终送达用户时间变长,会带来槽糕的用户体验。在1小时内,订单会快速经过多个阶段,直到最终送达用户。各个阶段需要紧密配合,确保订单顺利完成。

下图是一个用户视角的订单流程图。

订单流程图

从普通用户的角度来看,一个外卖订单从下单后,会经历支付、商家接单、配送、用户收货、售后及订单完成多个阶段。以技术的视角来分解的话,每个阶段依赖于多个子服务来共同完成,比如下单会依赖于购物车、订单预览、确认订单服务,这些子服务又会依赖于底层基础系统来完成其功能。

外卖业务另一个重要特征是一天内订单量会规律变化,订单会集中在中午、晚上两个“饭点”附近,而其它时间的订单量较少。这样,饭点附近系统压力会相对较大。

下图是一天内的外卖订单量分布图

订单量分布图

总结而言,外卖业务具有如下特征:

  • 流程较长且实时性要求高;
  • 订单量高且集中。

下面将按时间脉络为大家讲解订单系统经历的各个阶段、各阶段业务特征、挑战以及应对之道。

订单系统雏型

外卖业务发展早期,第一目标是要能够快速验证业务的可行性。技术上,我们需要保证架构足够灵活、快速迭代从而满足业务快速试错的需求。

在这个阶段,我们将订单相关功能组织成模块,与其它模块(门店模块等)一起形成公用jar包,然后各个系统通过引入jar包来使用订单功能。

早期系统的整体架构图如下所示:

订单系统雏型

早期,外卖整体架构简单、灵活,公共业务逻辑通过jar包实现后集成到各端应用,应用开发部署相对简单。比较适合业务早期逻辑简单、业务量较小、需要快速迭代的情况。但是,随着业务逻辑的复杂、业务量的增长,单应用架构的弊端逐步暴露出来。系统复杂后,大家共用一个大项目进行开发部署,协调的成本变高;业务之间相互影响的问题也逐渐增多。

早期业务处于不断试错、快速变化、快速迭代阶段,通过上述架构,我们能紧跟业务,快速满足业务需求。随着业务的发展以及业务的逐步成熟,我们对系统进行逐步升级,从而更好地支持业务。

独立的订单系统

2014年4月,外卖订单量达到了10万单/日,而且订单量还在持续增长。这时候,业务大框架基本成型,业务在大框架基础上快速迭代。大家共用一个大项目进行开发部署,相互影响,协调成本变高;多个业务部署于同一VM,相互影响的情况也在增多。

为解决开发、部署、运行时相互影响的问题。我们将订单系统进行独立拆分,从而独立开发、部署、运行,避免受其它业务影响。

系统拆分主要有如下几个原则:

  • 相关业务拆分独立系统;
  • 优先级一致的业务拆分独立系统;
  • 拆分系统包括业务服务和数据。

基于以上原则,我们将订单系统进行独立拆分,所有订单服务通过RPC接口提供给外部使用。订单系统内部,我们将功能按优先级拆分为不同子系统,避免相互影响。订单系统通过MQ(队列)消息,通知外部订单状态变更。

独立拆分后的订单系统架构如下所示:

独立拆分后的订单系统

其中,最底层是数据存储层,订单相关数据独立存储。订单服务层,我们按照优先级将订单服务划分为三个系统,分别为交易系统、查询系统、异步处理系统。

独立拆分后,可以避免业务间的相互影响。快速支持业务迭代需求的同时,保障系统稳定性。

高性能、高可用、高稳定的订单系统

订单系统经过上述独立拆分后,有效地避免了业务间的相互干扰,保障迭代速度的同时,保证了系统稳定性。这时,我们的订单量突破百万,而且还在持续增长。之前的一些小问题,在订单量增加后,被放大,进而影响用户体验。比如,用户支付成功后,极端情况下(比如网络、数据库问题)会导致支付成功消息处理失败,用户支付成功后依然显示未支付。订单量变大后,问题订单相应增多。我们需要提高系统的可靠性,保证订单功能稳定可用。

另外,随着订单量的增长、订单业务的复杂,对订单系统的性能、稳定性、可用性等提出了更高的要求。

为了提供更加稳定、可靠的订单服务,我们对拆分后的订单系统进行进一步升级。下面将分别介绍升级涉及的主要内容。

性能优化

系统独立拆分后,可以方便地对订单系统进行优化升级。我们对独立拆分后的订单系统进行了很多的性能优化工作,提升服务整体性能,优化工作主要涉及如下几个方面。

异步化

服务所需要处理的工作越少,其性能自然越高。可以通过将部分操作异步化来减少需要同步进行的操作,进而提升服务的性能。异步化有两种方案。

  • 线程或线程池:将异步操作放在单独线程中处理,避免阻塞服务线程;
  • 消息异步:异步操作通过接收消息完成。

异步化带来一个隐患,如何保障异步操作的执行。这个场景主要发生在应用重启时,对于通过线程或线程池进行的异步化,JVM重启时,后台执行的异步操作可能尚未完成。这时,需要通过JVM优雅关闭来保证异步操作进行完成后,JVM再关闭。通过消息来进行的,消息本身已提供持久化,不受应用重启影响。

具体到订单系统,我们通过将部分不必同步进行的操作异步化,来提升对外服务接口的性能。不需要立即生效的操作即可以异步进行,比如发放红包、PUSH推送、统计等。

以订单配送PUSH推送为例,将PUSH推送异步化后的处理流程变更如下所示:

异步化

PUSH异步化后,线程#1在更新订单状态、发送消息后立即返回,而不用同步等待PUSH推送完成。而PUSH推送异步在线程#2中完成。

并行化

操作并行化也是提升性能的一大利器,并行化将原本串行的工作并行执行,降低整体处理时间。我们对所有订单服务进行分析,将其中非相互依赖的操作并行化,从而提升整体的响应时间。

以用户下单为例,第一步是从各个依赖服务获取信息,包括门店、菜品、用户信息等。获取这些信息并不需要相互依赖,故可以将其并行化,并行后的处理流程变更如下所示:

并行化

通过将获取信息并行化,可有效缩短下单时间,提升下单接口性能。

缓存

通过将统计信息进行提前计算后缓存,避免获取数据时进行实时计算,从而提升获取统计数据的服务性能。比如对于首单、用户已减免配送费等,通过提前计算后缓存,可以简化实时获取数据逻辑,节约时间。

以用户已减免配送费为例,如果需要实时计算,则需要取到用户所有订单后,再进行计算,这样实时计算成本较高。我们通过提前计算,缓存用户已减免配送费。需要取用户已减免配送费时,从缓存中取即可,不必实时计算。具体来说,包括如下几点:

  • 通过缓存保存用户已减免配送费;
  • 用户下单时,如果订单有减免配送费,增加缓存中用户减免配送费金额(异步进行);
  • 订单取消时,如果订单有减免配送费,减少缓存中用户减免配送费金额(异步进行);

一致性优化

订单系统涉及交易,需要保证数据的一致性。否则,一旦出现问题,可能会导致订单不能及时配送、交易金额不对等。

交易一个很重要的特征是其操作具有事务性,订单系统是一个复杂的分布式系统,比如支付涉及订单系统、支付平台、支付宝/网银等第三方。仅通过传统的数据库事务来保障不太可行。对于订单交易系统的事务性,并不要求严格满足传统数据库事务的ACID性质,只需要最终结果一致即可。针对订单系统的特征,我们通过如下种方式来保障最终结果的一致性。

重试/幂等

通过延时重试,保证操作最终会最执行。比如退款操作,如退款时遇到网络或支付平台故障等问题,会延时进行重试,保证退款最终会被完成。重试又会带来另一个问题,即部分操作重复进行,需要对操作进行幂等处理,保证重试的正确性。

以退款操作为例,加入重试/幂等后的处理流程如下所示:

引入重试的退款流程

退款操作首先会检查是否已经退款,如果已经退款,直接返回。否则,向支付平台发起退款,从而保证操作幂等,避免重复操作带来问题。如果发起退款失败(比如网络或支付平台故障),会将任务放入延时队列,稍后重试。否则,直接返回。

通过重试+幂等,可以保证退款操作最终一定会完成。

2PC

2PC是指分布式事务的两阶段提交,通过2PC来保证多个系统的数据一致性。比如下单过程中,涉及库存、优惠资格等多个资源,下单时会首先预占资源(对应2PC的第一阶段),下单失败后会释放资源(对应2PC的回滚阶段),成功后会使用资源(对应2PC的提交阶段)。对于2PC,网上有大量的说明,这里不再继续展开。

高可用

分布式系统的可用性由其各个组件的可用性共同决定,要提升分布式系统的可用性,需要综合提升组成分布式系统的各个组件的可用性。

针对订单系统而言,其主要组成组件包括三类:存储层、中间件层、服务层。下面将分层说明订单系统的可用性。

存储层

存储层的组件如MySQL、ES等本身已经实现了高可用,比如MySQL通过主从集群、ES通过分片复制来实现高可用。存储层的高可用依赖各个存储组件即可。

中间件层

分布式系统会大量用到各类中间件,比如服务调用框架等,这类中间件一般使用开源产品或由公司基础平台提供,本身已具备高可用。

服务层

在分布式系统中,服务间通过相互调用来完成业务功能,一旦某个服务出现问题,会级联影响调用方服务,进而导致系统崩溃。分布式系统中的依赖容灾是影响服务高可用的一个重要方面。

依赖容灾主要有如下几个思路:

  • 依赖超时设置;
  • 依赖灾备;
  • 依赖降级;
  • 限制依赖使用资源;

订单系统会依赖多个其它服务,也存在这个问题。当前订单系统通过同时采用上述四种方法,来避免底层服务出现问题时,影响整体服务。具体实现上,我们采用Hystrix框架来完成依赖容灾功能。Hystrix框架采用上述四种方法,有效实现依赖容灾。订单系统依赖容灾示意图如下所示

资源软隔离

通过为每个依赖服务设置独立的线程池、合理的超时时间及出错时回退方法,有效避免服务出现问题时,级联影响,导致整体服务不可用,从而实现服务高可用。

另外,订单系统服务层都是无状态服务,通过集群+多机房部署,可以避免单点问题及机房故障,实现高可用。

小结

上面都是通过架构、技术实现层面来保障订单系统的性能、稳定性、可用性。实际中,有很多的事故是人为原因导致的,除了好的架构、技术实现外,通过规范、制度来规避人为事故也是保障性能、稳定性、可用性的重要方面。订单系统通过完善需求review、方案评审、代码review、测试上线、后续跟进流程来避免人为因素影响订单系统稳定性。

通过以上措施,我们将订单系统建设成了一个高性能、高稳定、高可用的分布式系统。其中,交易系统tp99为150ms、查询系统tp99时间为40ms。整体系统可用性为6个9。

可扩展的订单系统

订单系统经过上面介绍的整体升级后,已经是一个高性能、高稳定、高可用的分布式系统。但是系统的的可扩展性还存在一定问题,部分服务只能通过垂直扩展(增加服务器配置)而不能通过水平扩展(加机器)来进行扩容。但是,服务器配置有上限,导致服务整体容量受到限制。

到2015年5月的时候,这个问题就比较突出了。当时,数据库服务器写接近单机上限。业务预期还会继续快速增长。为保障业务的快速增长,我们对订单系统开始进行第二次升级。目标是保证系统有足够的扩展性,从而支撑业务的快速发展。

分布式系统的扩展性依赖于分布式系统中各个组件的可扩展性,针对订单系统而言,其主要组成组件包括三类:存储层、中间件层、服务层。下面将分层说明如何提高各层的可扩展性。

存储层

订单系统存储层主要依赖于MySQL持久化、tair/redis cluster缓存。tair/redis cluster缓存本身即提供了很好的扩展性。MySQL可以通过增加从库来解决读扩展问题。但是,对于写MySQL存在单机容量的限制。另外,数据库的整体容量受限于单机硬盘的限制。

存储层的可扩展性改造主要是对MySQL扩展性改造。

  • 分库分表

写容量限制是受限于MySQL数据库单机处理能力限制。如果能将数据拆为多份,不同数据放在不同机器上,就可以方便对容量进行扩展。

对数据进行拆分一般分为两步,第一步是分库,即将不同表放不同库不同机器上。经过第一步分库后,容量得到一定提升。但是,分库并不能解决单表容量超过单机限制的问题,随着业务的发展,订单系统中的订单表即遇到了这个问题。

针对订单表超过单库容量的问题,需要进行分表操作,即将订单表数据进行拆分。单表数据拆分后,解决了写的问题,但是如果查询数据不在同一个分片,会带来查询效率的问题(需要聚合多张表)。由于外卖在线业务对实时性、性能要求较高。我们针对每个主要的查询维度均保存一份数据(每份数据按查询维度进行分片),方便查询。

具体来说,外卖主要涉及三个查询维度:订单ID、用户ID、门店ID。对订单表分表时,对于一个订单,我们存三份,分别按照订单ID、用户ID、 门店ID以一定规则存储在每个维度不同分片中。这样,可以分散写压力,同时,按照订单ID、用户ID、门店ID三个维度查询时,数据均在一个分片,保证较高的查询效率。

订单表分表后,订单表的存储架构如下所示:

分库分表

可以看到,分表后,每个维度共有100张表,分别放在4个库上面。对于同一个订单,冗余存储了三份。未来,随着业务发展,还可以继续通过将表分到不同机器上来持续获得容量的提升。

分库分表后,订单数据存储到多个库多个表中,为应用层查询带来一定麻烦,解决分库分表后的查询主要有三种方案:

  • MySQL服务器端支持:目前不支持。
  • 中间件。
  • 应用层。

由于MySQL服务器端不能支持,我们只剩下中间件和应用层两个方案。中间件方案对应用透明,但是开发难度相对较大,当时这块没有资源去支持。于是,我们采用应用层方案来快速支持。结合应用开发框架(SPRING+MYBATIS),我们实现了一个轻量级的分库分表访问插件,避免将分库分表逻辑嵌入到业务代码。分库分表插件的实现包括如下几个要点。

  • 配置文件管理分库分表配置信息;
  • JAVA注解说明SQL语句分库分表信息;
  • JAVA AOP解析注解+查询配置文件,获取数据源及表名;
  • MYBATIS动态替换表名;
  • SPRING动态替换数据源。

通过分库分表,解决了写容量扩展问题。但是分表后,会给查询带来一定的限制,只能支持主要维度的查询,其它维度的查询效率存在问题。

  • ES搜索

订单表分表之后,对于ID、用户ID、门店ID外的查询(比如按照手机号前缀查询)存在效率问题。这部分通常是复杂查询,可以通过全文搜索来支持。在订单系统中,我们通过ES来解决分表后非分表维度的复杂查询效率问题。具体来说,使用ES,主要涉及如下几点。

  • 通过databus将订单数据同步到ES。
  • 同步数据时,通过批量写入来降低ES写入压力。
  • 通过ES的分片机制来支持扩展性。

小结

通过对存储层的可扩展性改造,使得订单系统存储层具有较好的可扩展性。对于中间层的可扩展性与上面提到的中间层可用性一样,中间层本身已提供解决方案,直接复用即可。对于服务层,订单系统服务层提供的都是无状态服务,对于无状态服务,通过增加机器,即可获得更高的容量,完成扩容。

通过对订单系统各层可扩展性改造,使得订单系统具备了较好的可扩展性,能够支持业务的持续发展,当前,订单系统已具体千万单/日的容量。

上面几部分都是在介绍如何通过架构、技术实现等手段来搭建一个可靠、完善的订单系统。但是,要保障系统的持续健康运行,光搭建系统还不够,运维也是很重要的一环。

智能运维的订单系统

早期,对系统及业务的运维主要是采用人肉的方式,即外部反馈问题,RD通过排查日志等来定位问题。随着系统的复杂、业务的增长,问题排查难度不断加大,同时反馈问题的数量也在逐步增多。通过人肉方式效率偏低,并不能很好的满足业务的需求。

为提升运维效率、降低人力成本,我们对系统及业务运维进行自动化、智能化改进,改进包括事前、事中、事后措施。

  • 事前措施

事前措施的目的是为提前发现隐患,提前解决,避免问题恶化。

在事前措施这块,我们主要采取如下几个手段:

  1. 定期线上压测:通过线上压测,准确评估系统容量,提前发现系统隐患;
  2. 周期性系统健康体检:通过周期检测CPU利用率、内存利用率、接口QPS、接口TP95、异常数,取消订单数等指标是否异常,可以提前发现提前发现潜在问题、提前解决;
  3. 全链路关键日志:通过记录全链路关键日志,根据日志,自动分析反馈订单问题原因,给出处理结果,有效提高反馈处理效率。
  • 事中措施

事中措施的目的是为及时发现问题、快速解决问题。

事中这块,我们采取的手段包括:

  1. 订单监控大盘:实时监控订单业务指标,异常时报警;
  2. 系统监控大盘:实时监控订单系统指标,异常时报警;
  3. 完善的SOP:报警后,通过标准流程,快速定位问题、解决问题。
  • 事后措施

事后措施是指问题发生后,分析问题原因,彻底解决。并将相关经验教训反哺给事前、事中措施,不断加强事先、事中措施,争取尽量提前发现问题,将问题扼杀在萌芽阶段。

通过将之前人肉进行的运维操作自动化、智能化,提升了处理效率、减少了运维的人力投入。

看完以后是不是想拍个砖、留个言,抒发下己见?可以来微信公众号给我们评论,还能在第一时间获取我们发布的一些实践经验总结、最新的活动报名信息。关注可扫码:

公众号二维码

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交