若谷学院
互联网公司技术架构分享

大众点评支付渠道网关系统的实践之路

美团点评阅读(1927)

业务的快速增长,要求系统在快速迭代的同时,保持很好的扩展性和可用性。其中,交易系统除了满足上述要求之外,还必须保持数据的强一致性。对系统开发人员而言,这既是机遇,也是挑战。本文主要梳理大众点评支付渠道网关系统在面对这些成长烦恼时的演进之路,以及过程中的一些思考和实践。

在整个系统的演进过程中,核心思路是:大系统做小,做简单(具体描述可参考《高可用性系统在大众点评的实践与经验》)。在渠道网关系统实践过程中,可以明显区分出几个有代表性的阶段。

一、能用阶段

早期业务流量还不是很大,渠道网关系统业务逻辑也很简单,一句话总结就是:让用户在交易的时候,能顺利把钱给付了。做的事情可简单概括成3件:发起支付请求、接收支付成功通知以及用户要求退款时原路退回给用户的支付账户。这个阶段系统实践比较简单,主要就是“短、平、快”,快速接入新的第三方支付渠道并保证能用。系统架构如图1。
能用阶段

二、可用阶段

在系统演进初期的快速迭代过程中,接入的第三方支付渠道不多,系统运行还算比较平稳,一些简单问题也可通过开发人员人工快速解决。但随着接入的第三方支付渠道不断增多,逐渐暴露出一些新的问题:

(1) 所有的业务逻辑都在同一个物理部署单元,不同业务之间互相影响(例如退款业务出现问题,但是与此同时把支付业务也拖垮了);

(2) 随着业务流量的增大,数据库的压力逐渐增大,数据库的偶尔波动造成系统不稳定,对用户的支付体验影响很大;

(3) 支付、退款等状态的同步很大程度上依赖第三方支付渠道的异步通知,一旦第三方支付渠道出现问题,造成大量客诉,用户体验很差,开发、运营都很被动。

针对(1)中的业务之间互相影响问题,我们首先考虑进行服务拆分,将之前一个大的物理部署单元拆成多个物理部署单元。有两种明显的可供选择的拆分策略:

  • 按照渠道拆分,不同的第三方支付渠道独立一个物理部署单元,例如微信一个部署单元,支付宝一个部署单元等;
  • 按照业务类型拆分,不同的业务独立一个物理部署单元,例如支付业务一个部署单元,退款业务一个部署单元等。

考虑到在当时的流量规模下,支付业务优先级最高,退款等业务的优先级要低;而有些渠道的流量占比很小,作为一个独立的部署单元,会造成一定的资源浪费,且增加了系统维护的复杂度。基于此,我们做了一个符合当时系统规模的trade-off:选择了第2种拆分策略 — 按照业务类型拆分。

针对(2)中的DB压力问题,我们和DBA一起分析原因,最终选择了Master-Slave方案。通过增加Slave来缓解查询压力;通过强制走Master来保证业务场景的强一致性;通过公司的DB中间件Zebra来做负载均衡和灾备切换,保证DB的高可用性。

针对(3)中的状态同步问题,我们对不同渠道进行梳理,在已有的第三方支付渠道异步通知的基础上,通过主动查询定时批量同步状态,解决了绝大部分状态同步问题。对于仍未同步的少量Case,系统开放出供内部使用的API,方便后台接入和开发人员手动补单。

在完成上述的实践之后,渠道网关系统已达到基本可用阶段,通过内部监控平台可以看到,核心服务接口可用性都能达到99.9%以上。演化之后的系统架构如图2。
可用阶段

三、柔性可用阶段

在解决了业务隔离、DB压力、状态同步等问题后,渠道网关系统度过一段稳定可用的时期。但架不住业务飞速增长的压力,之前业务流量规模下的一些小的系统波动、流量冲击等异常,在遭遇流量洪峰时被急剧放大,最终可能成为压垮系统的最后一根稻草。在新的业务流量规模下,我们面临着新的挑战:

(1) 随着团队的壮大,新加入的同学在接入新的渠道或者增加新的逻辑时,往往都会优先选用自己熟悉的方式完成任务。但熟悉的不一定是合理的,有可能会引入新的风险。特别是在与第三方渠道对接时,系统目前在使用的HTTP交互框架就有 JDK HttpURLConnection/HttpsURLConnection、Httpclient3.x、Httpclient4.x(4.x版本内部还分别有使用不同的小版本)。仅在这个上面就踩过好几次惨痛的坑。

(2) 在按业务类型进行服务拆分后,不同业务不再互相影响。但同一业务内部,之前流量规模小的时候,偶尔波动一次影响不大,现在流量增大后,不同渠道之间就开始互相影响。例如支付业务,对外统一提供分布式的支付API,所有渠道共享同一个服务RPC连接池,一旦某一个渠道的支付接口性能恶化,导致大量占用服务RPC连接,其他正常渠道的请求都无法进来;而故障渠道性能恶化直接导致用户无法通过该渠道支付成功,连锁反应导致用户多次重试,从而进一步导致恶化加剧,最终引起系统雪崩,拒绝服务,且重启后的服务还有可能被大量的故障渠道重试请求给再次击垮。

(3) 目前接入的第三方支付渠道,无论是第三方支付公司、银行或是其他外部支付机构,基本都是通过重定向或SDK的方式引导用户完成最终支付动作。在这条支付链路中,渠道网关系统只是在后端与第三方支付渠道进行交互(生成支付重定向URL或预支付凭证),且只能通过第三方支付渠道的异步通知或自己主动进行支付查询才能得知最终用户支付结果。一旦某个第三方支付渠道内部发生故障,渠道网关系统完全无法得知该支付链路已损坏,这对用户支付体验造成损害。

(4) 现有的渠道网关的DB,某些非渠道网关服务仍可直接访问,这对渠道网关系统的DB稳定性、DB容量规划等带来风险,进而影响渠道网关系统的可用性,内部戏称被戴了“绿帽子”。

(5) 对于退款链路,系统目前未针对退款异常case进行统一收集、整理并分类,且缺乏一个清晰的退款链路监控。这导致用户申请退款后,少量用户的退款请求最终未处理成功,用户发起客诉。同时由于缺乏监控,导致这种异常退款缺乏一个后续推进措施,极端情形下,引起用户二次客诉,极大损害用户体验和公司信誉度。

为最大程度解决问题(1)中描述的风险,在吸取踩坑的惨痛教训后,我们针对第三方渠道对接,收集并整理不同的应用场景,抽象出一套接入框架。接入框架定义了请求组装、请求执行、响应解析和错误重试这一整套网关交互流程,屏蔽了底层的HTTP或Socket交互细节,并提供相应的扩展点。针对银行渠道接入存在前置机这种特殊的应用场景,还基于Netty抽象出连接池(Conn Pool)和简单的负载均衡机制(LB, 提供Round Robin路由策略)。不同渠道在接入时可插入自定义的组装策略(扩展已有的HttpReq、HttpsReq或NettyReq),执行策略[扩展已有(Http、Https或Netty)Sender/Receiver],解析策略(扩展已有的HttpResp、HttpsResp或NettyResp),并复用框架已提供的内容解析(binary/xml/json parser)、证书加载(keystore/truststore loader)和加解密签名(encrypt/decrypt/sign/verify sign)组件,从而在达到提高渠道接入效率的同时,尽可能减少新渠道接入带来的风险。接入框架的流程结构如图3。
渠道接入框架

为解决问题(2)中渠道之间相互影响,一个简单直观的思路就是渠道隔离。如何隔离,隔离到什么程度?这是2个主要的问题点:

  • 如何隔离 考虑过将支付服务进一步按照渠道拆分,将系统继续做小,但是拆分后,支付API的调用端需要区分不同渠道调用不同的支付API接口,这相当于将渠道隔离问题抛给了调用端;同时拆分后服务增多,调用端需要维护同一渠道支付业务的多个不同RPC-API,复杂度提高,增加了开发人员的维护负担,这在当前的业务流量规模下不太可取。所以我们选择了在同一个支付服务API内部进行渠道隔离。由于共用同一个支付服务服务API连接池,渠道隔离的首要目标就是避免故障渠道大量占用AP连接池,对其他正常渠道造成株连影响。如果能够自动检测出故障渠道,并在其发生故障的初期阶段就快速失败该故障渠道的请求,则从业务逻辑上就自动完成了故障渠道的隔离。
  • 隔离到什么程度 一个支付渠道下存在不同的支付方式(信用卡支付、借记卡支付、余额支付等),而有些支付方式(例如信用卡支付)还存在多个银行。所以我们直接将渠道隔离的最小粒度定义到支付渠道 -> 支付方式 -> 银行。

基于上述的思考,我们设计并实现了一个针对故障渠道的快速失败(fail-fast)机制:

  • 将每一笔支付请求所附带的支付信息抽象为一个特定的fail-fast路径,请求抽象成一个fail-fast事务,请求成功即认为事务成功,反之,事务失败。
  • 在fail-fast事务执行过程中,级联有2个fail-fast断路开关:
    • 静态开关,根据人工配置(on/off),断定某个支付请求是否需快速失败。
    • 动态开关,根据历史统计信息,确定当前健康状态,进而断定是否快速失败当前支付请求。
  • 动态断路开关抽象了3种健康状态(closed-放行所有请求;half_open-部分比例的请求放行;open-快速失败所有请求),并依据历史统计信息(总请求量/请求失败量/请求异常量/请求超时量),在其内部维护了一个健康状态变迁的状态机。状态变迁如图4。
    FailFast状态变迁
  • 状态机的每一次状态变迁都会产生一个健康状态事件,收银台服务可以监听这个健康状态事件,实现支付渠道的联动上下线切换。
  • 每一笔支付请求结束后都会动态更新历史统计信息。

经过线上流量模拟压测观察,fail-fast机制给系统支付请求增加了1~5ms的额外耗时,相比第三方渠道的支付接口耗时,占比1%~2%,属于可控范围。渠道故障fail-fast机制上线之后,结合压测配置,经过几次微调,稳定了线上环境的fail-fast配置参数。

在前不久的某渠道支付故障时,通过公司内部的监控平台,明显观察到fail-fast机制起到很好的故障隔离效果,如下图5。
故障隔离效果

为解决问题(3)中支付链路可用性监测,依赖公司内部的监控平台上报,实时监控支付成功通知趋势曲线;同时渠道网关系统内部从业务层面自行实现了支付链路端到端的监控。秒级监控支付链路端到端支付成功总量及支付成功率,并基于这2个指标的历史统计信息,提供实时的支付链路邮件或短信报警。而在流量高峰时,该监控还可通过人工手动降级(异步化或关闭)。这在很大程度上提高了开发人员的核心支付链路故障响应速度。

为解决问题(4)中的“绿帽子”,渠道网关系统配合DBA回收所有外部系统的DB直接访问权限,提供替换的API以供外部系统访问,这给后续的提升DB稳定性、DB容量规划以及后续可能的异步多机房部署打下基础。

针对问题(5)中退款case,渠道网关系统配合退款链路上的其他交易、支付系统,从源头上对第三方渠道退款异常case进行统一收集、整理并分类,并形成退款链路核心指标(退款当日成功率/次日成功率/7日成功率)监控,该部分的系统实践会随着后续的“退款链路统一优化”一起进行分享;

随着上述实践的逐步完成,渠道网关系统的可用性得到显著提高,核心链路的API接口可用性达到99.99%,在公司的917大促中,渠道网关系统平稳度过流量高峰,并迎来了新的记录:提交第三方渠道支付请求的TPS达到历史新高。且在部分渠道接口发生故障时,能保证核心支付API接口的稳定性,并做到故障渠道的自动检测、恢复,实现收银台对应渠道的联动上下线切换。同时,通过核心支付链路支付成功率监控,实现第三方渠道内部故障时,渠道上下线的手动切换。至此,基本保证了在部分第三方渠道有损的情况下,渠道网关系统的柔性可用。演化后的此阶段系统架构如图6。
柔性可用阶段

四、经验与总结

在整个渠道网关系统一步步的完善过程中,踩过很多坑,吃过很多教训,几点小的收获:

  1. 坚持核心思想,拆分、解耦,大系统做小,做简单;
  2. 系统总会有出问题的时候,重要的是如何快速定位、恢复、解决问题,这是一个长期而又艰巨的任务;
  3. 高可用性的最大敌人不仅是技术,还是使用技术实现系统的人,如何在业务、系统快速迭代的过程中,保证自我驱动,不掉队;
  4. 高流量,大并发对每一个工程师既是挑战,更是机遇。

原文出自:https://tech.meituan.com/archives

专访美团外卖曹振团:天下武功唯快不破

美团点评阅读(1887)

本文转自InfoQ中文网站,首发地址:http://www.infoq.com/cn/news/2016/06/Meituan-take-away

马云曾经说过:世界是懒人创造出来的。在“懒人”们的推动下,O2O的战火已经燃烧到了外卖行业。据报告,2015年外卖市场年交易额已经达到450余亿元,其中美团外卖从2013年底上线以来,已经迅速超过了400万日订单。作为美团内部的创业项目,美团外卖是如何一步步从复用美团的基础服务,到如今具有在高并发下百万级别订单的业务系统的。

2016年7月15-16日,ArchSummit全球架构师峰会将在深圳举行。本届大会,我们邀请了美团外卖技术专家曹振团老师,前来分享《美团外卖系统架构演进与系统稳定性经验谈》的内容,讲述在美团外卖业务的高速发展中,系统架构各阶段面临的不同挑战以及对应的解决方案。

现在我们就来采访曹振团老师,让老师给大家送一份充满干货的“外卖”。

InfoQ:在你认为,怎样的架构是一个好架构,怎样的架构师是一个好的架构师?能否谈谈如何避免一个“坏”架构的诞生?

曹振团:一个好的架构是与业务特点及与业务发展阶段相结合的、因地制宜且与时俱进的。一个好的架构既能支撑业务的快速稳定发展,又能留有一定的灵活性和扩展性。而好的架构师需要做好三点:

  1. 首先需要充分理解业务,只有充分理解业务之后才能使架构不仅能够很好地支持业务特点,并具有一定的前瞻性。架构师需要站在推进业务发展的角度上合理地改进和优化架构设计,为业务的快速发展做好保障。
  2. 其次,架构师要具备良好的沟通和协调能力。架构师往往要面临着跨组、跨团队、跨事业部甚至跨事业群的一些技术方案,需要沟通和协调各方的诉求和冲突。
  3. 最后要具备持续学习的能力。不仅要学习技术知识的变化,更要学习业界的发展思路、最佳实践等。

好的架构是实践出来的,好的架构师也是在实践中快速成长起来的。新美大内部对架构师的培养非常注重实践和交流。架构师都在业务一线深入了解业务、需求及技术特性。美团外卖业务在快速发展的过程中踩了很多坑,这也让我们的架构师更快的成长起来。新美大内部各事业群的Casestudy都是共享的,这也是一个非常好的学习平台,从他人的经验和总结中学习。另外内部开设有架构专题分享,提供给大家一个架构实践的交流和讨论的平台。

架构通常是为了解决系统的高并发、高性能、高可用的问题,结合业务特点在研发资源、排期、技术方案之间做平衡。一个“坏”的架构则破坏了这种平衡,比如:由于工期紧张而引入了一个自己并不能把控的技术方案,为系统的稳定性埋下了雷。还有一个简单的判断标准是:当采用这个架构后在未来多长时间或几倍增量下需要调整架构。基本上要求至少在未来的半年到一年内或2倍增量下不需要调整架构。如果架构设计评审不符合这个标准就要及时重新设计或调整。

InfoQ:你曾在网易工作过,能否简单谈谈在架构设计上美团外卖与网易各自的特点是什么?

曹振团:门户网站面对着海量的新闻事件及分类,具有更新量大、过期快和热点集中的特点。新闻资料基本是静态的,所以对缓存技术应用较多。

美团外卖涉及到交易的信息、支付、履约的各个环节,参与方较多。从用户支付下单到商家接单,还有骑手配送等环节,这些过程都是动态有交互的,美团外卖对服务化及服务治理设计较多。

InfoQ:你加入美团后有参与多个创新业务的探索,那么你认为在业务初期好的架构是必要的吗,为什么呢?能否分享经历了多个创新业务之后的收获,这些经验给美团外卖带来了什么样的收获和挑战?

曹振团:2013年加入美团时,我们在O2O这个方向上进行了多个创新业务的探索,完全是新的事情,从零开始。在早期最重要的事情就是验证需求,验证产品是否能够满足用户的核心需求,是否能够被用户接受。这一阶段就是快速试错,通常以MVP的方式快速迭代。我们需要足够快地找到方向,此时灵活性优先于架构。

加入新美大的早期参与公司内部的创业,在O2O这个领域进行了多个创新业务的探索,最后决定做外卖这个业务。在经历创新业务后最大的收获主要讲两点:

  1. 创业的早期决定不做什么往往比决定做什么更重要。在早期产品技术资源都很紧缺,需要将有限的资源投入到最核心的产品需求上。
  2. 天下武功唯快不破。如今的市场瞬息万变,唯有快速的迭代,方能抢占先机。

创业会给技术带来大量的挑战:首先需要是个全面手,前端、后台、数据库、运维都能够迅速切入。另外也要有足够的技术视野和深度,才能够在技术方案的选择上游刃有余,既能快速的选择合适的方案,又能够把握其技术原理。

InfoQ:美团外卖创建初期是否借鉴了美团的架构设计?美团外卖架构的创建和发展经历了什么样的过程?

曹振团:美团外卖创建初期复用了很多美团的基础服务,主要借鉴了团购业务积累的经验。 美团外卖的架构是随着业务的发展优化和演进的。主要经历了以下几个过程:

  1. 自由发展阶段:业务起步的时候,大家公用服务和数据库表,这样能够快速支持产品迭代。产品和技术人员聚焦在快速验证产品功能上。到底应该有哪些功能,用户会如何点外卖等?这个阶段主要的特点就是集中,所有的功能都集中在几个项目里,所有的表都集中在一个库中。
  2. 故障驱动架构:随着业务的爆发增长,早期的架构出现了很多的问题,系统频繁地出现稳定性的问题,共享数据库表导致业务逻辑散落各地、甚至实现不一致的情况。这时系统稳定性问题倒逼架构进行优化调整,进行了服务化拆分,服务之间全部用接口的方式调用。
  3. 架构驱动改革:随着单量的快速增长,系统故障所造成的损失是巨大的、不可接受的。需要从架构驱动技术体系的改进、甚至推进产品和业务的变革。同时增加业务的容灾能力,进行了多机房的部署。

InfoQ:在外卖交易流程中,哪个环节最容易出现问题,你们是如何解决的?

曹振团:美团外卖在快速发展的过程中踩了很多坑。比如发版引发的故障,这类问题一般是最多的。发版的新功能可能含有bug,可能含有慢SQL,可能没有正确处理网络异常等。外卖业务还是一个实时且流程较长的业务,尤其流量比较集中在中午和晚上的饭点,在高峰时段的QPS是万级别的,外卖整个链条还会涉及到很多的服务,如果某个依赖服务有问题将会影响到其它服务。例如没有设置合理的RPC超时时间,如果服务A挂了,会导致依赖该服务的其它服务出现问题,进而引起连锁反应,最终系统雪崩。

针对这些问题,我们做了关键路径梳理。整个交易流程中梳理出关键路径,非关键调用异步化,关键路径做好容错容灾设计,实时监控关键路径的核心指标。整体系统做了比较全面的应急预案,当有紧急情况发生时,力保关键路径不出问题。

至于如何避免掉坑,分享几点:

  1. 防御式编程,不要相信任何人或服务,做好对自身服务的保护和对依赖服务的熔断。
  2. 每次发版都有预案。回滚是解决发版引发故障的最快捷有效的手段。
  3. 灰度!灰度!灰度!
  4. 完善的监控很重要,需要覆盖系统性能及业务指标,并对这些指标熟知和敏感。

InfoQ: 在美团外卖业务急速发展的同时,目前美团外卖架构上是否有需要改进和突破的地方?是否有关注前沿的技术,目前通过什么渠道来发现持续优化架构的方法?

曹振团: 业务屡创新高及团队规模的扩大,给技术架构带来了越来越多的挑战。如何能够在大规模作战的情况下,既能保持业务开发的敏捷性又能保障系统的稳定性,是我们架构中要重点思考的,我们一直在结合着业务发展的节奏,优化和改进技术架构。一方面要与时俱进地支持多品类的业务扩展及平台化建设;另一方面,在永恒不变的话题“稳定性”方面,在为能够支持更高的业务量,持续抽取和优化基础服务,加强中间件的能力,使各服务能够更聚焦和专注。因此加强基础设施和强固中间件的建设是我们要改进和突破的地方。夯实技术基础设施和中间件,让研发力量更聚焦在业务开发中。

美团外卖在技术架构中大量使用了开源的技术组件,在深入调研后会结合外卖的业务特点做二次开发,感谢Java社区里丰富的开源软件,使得我们能够更好地优化我们的架构。另外一个发现和持续优化架构的方法是全链路的在线压测。我们会定期进行全链路的在线压测,发现系统的瓶颈并优化验证。

InfoQ: 在架构演进过程中分别使用了哪些核心指标来判断公司架构是否改善?你们对核心指标的要求是多少,目前是否已经稳定达到?

曹振团:架构的演进是为了能够提供更可靠的SLA,核心的考量指标有:5倍的容量,4个9的可用性,TP99 200ms的响应时间,目前已经可以稳定达到了。这些指标界定了系统能够在一定的容量下具有稳定和可靠的表现。我们会定期地做全链路的在线压测,通过实时的数据指标监控系统分析和验证是否已经达成架构目标。可用性和响应时间在服务端还是比较好衡量和达成的,但是从用户端看就变得复杂了很多。比如用户的设备、网络环境等都可能会对结果造成影响,我们也在加强用户侧的监控,持续地优化和改进用户体验。

InfoQ: 外卖领域作为一种懒人经济,你们认为这些“懒人”在数据上体现了怎样的特征?

曹振团:有一种说法是“世界是由懒人驱动的”。由于懒得去换台,发明了遥控器。懒得爬楼梯,发明了电梯。如果你懒得出去吃,我们可以送外卖上门。懒是一个强需求。

我们在解决这个强需求的过程中,发现了一些有趣的变化:由于出现了这样的外卖平台,满足了大家懒的需求,越发使大家变“懒”了,平均点外卖的次数在变多,花样在变多(大家的口味在变多)。虽然变“懒”了,但是对品质的要求却在提高。我们通过优化配送链条上的服务,使得能够更及时、快速地为“懒人”提供美食。配送变快了,口感有了更好的保证。这又促使一些之前不提供外卖服务的品牌商家加入到外卖平台上来,使得整个生态更加良性发展。

另外一个特征是,“懒人”的需求也是多样的,“懒人”们希望能够提供更多的外送选择,这将会改写“外卖”的定义,它将会含有更广阔和丰富的含义,而不只是 “美食外卖”。

InfoQ: 作为架构师,有什么感悟和经验和大家分享吗?

曹振团: 主要分享几点吧:

  1. 性能是功能的一部分,稳定是功能的一部分。性能和稳定性需要在开发设计的时候作为产品功能的一部分来考虑,而不是扩展属性。这样才能在开发、测试、上线、运行中全程把握。
  2. 简单即美。把复杂的事情简单化,抓住核心脉络,解决好主要矛盾不见得一个大而全的海纳百川的架构才是好的,相反清晰、明了的架构是我们追求的,简单可依赖。
  3. 使用能够驾驭的技术。丰富的开源社区给了我们很多的选择和视角,任何引入的开源技术都应该理解其原理和本质,只有能够驾驭的技术才能用好。
  4. 细节决定成败。软件架构是一项复杂和精细的工作,只有把每个细节都做极致了,才能保障架构的基石稳健。

InfoQ:感谢曹振团老师接受我们的采访。期待你在ArchSummit全球架构师峰会上的分享。

原文出自:https://tech.meituan.com/archives

Cache应用中的服务过载案例研究

美团点评阅读(1839)

简单地说,过载是外部请求对系统的访问量突然激增,造成请求堆积,服务不可用,最终导致系统崩溃。本文主要分析引入Cache可能造成的服务过载,并讨论相关的预防、恢复策略。Cache在现代系统中使用广泛,由此引入的服务过载隐患无处不在,但却非常隐蔽,容易被忽视。本文希望能为开发者在设计和编写相关类型应用,以及服务过载发生处理时能够有章可循。

一个服务过载案例

本文讨论的案例是指存在正常调用关系的两个系统(假设调用方为A系统,服务方为B系统),A系统对B系统的访问突然超出B系统的承受能力,造成B系统崩溃。造成服务过载的原因很多,这里分析的是严重依赖Cache的系统服务过载。首先来看一种包含Cache的体系结构(如下图所示)。

A系统依赖B系统的读服务,A系统是60台机器组成的集群,B系统是6台机器组成的集群,之所以6台机器能够扛住60台机器的访问,是因为A系统并不是每次都访问B,而是首先请求Cache,只有Cache的相应数据失效时才会请求B。

这正是Cache存在的意义,它让B系统节省了大量机器;如果没有Cache,B系统不得不组成60台机器的集群,如果A也同时依赖除B系统外的另一个系统(假设为C系统)呢?那么C系统也要60台机器,放大的流量将很快耗尽公司的资源。

然而Cache的引入也不是十全十美的,这个结构中如果Cache发生问题,全部的流量将流向依赖方,造成流量激增,从而引发依赖系统的过载。

回到A和B的架构,造成服务过载的原因至少有下面三种:

  1. B系统的前置代理发生故障或者其他原因造成B系统暂时不可用,等B系统系统服务恢复时,其流量将远远超过正常值。
  2. Cache系统故障,A系统的流量将全部流到B系统,造成B系统过载。
  3. Cache故障恢复,但这时Cache为空,Cache瞬间命中率为0,相当于Cache被击穿,造成B系统过载。

第一个原因不太好理解,为什么B系统恢复后流量会猛增呢?主要原因就是缓存的超时时间。当有数据超时的时候,A系统会访问B系统,但是这时候B系统偏偏故障不可用,那么这个数据只好超时,等发现B系统恢复时,发现缓存里的B系统数据已经都超时了,都成了旧数据,这时当然所有的请求就打到了B。

下文主要介绍服务过载的预防和发生后的一些补救方法,以预防为主,从调用方和服务方的视角阐述一些可行方案。

服务过载的预防

所谓Client端指的就是上文结构中的A系统,相对于B系统,A系统就是B系统的Client,B系统相当于Server。

Client端的方案

针对上文阐述的造成服务过载的三个原因:B系统故障恢复、Cache故障、Cache故障恢复,我们看看A系统有哪些方案可以应对。

合理使用Cache应对B系统宕机

一般情况下,Cache的每个Key除了对应Value,还对应一个过期时间T,在T内,get操作直接在Cache中拿到Key对应Value并返回。但是在T到达时,get操作主要有五种模式:

1. 基于超时的简单(stupid)模式

在T到达后,任何线程get操作发现Cache中的Key和对应Value将被清除或标记为不可用,get操作将发起调用远程服务获取Key对应的Value,并更新写回Cache,然后get操作返回新值;如果远程获取Key-Value失败,则get抛出异常。

为了便于理解,举一个码头工人取货的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,这时5个工人各自分别去对岸取新货,然后返回。

2. 基于超时的常规模式

在T到达后,Cache中的Key和对应Value将被清除或标记为不可用,get操作将调用远程服务获取Key对应的Value,并更新写回Cache;此时,如果另一个线程发现Key和Value已经不可用,get操作还需要判断有没有其他线程发起了远程调用,如果有,那么自己就等待,直到那个线程远程获取操作成功,Cache中得Key变得可用,get操作返回新的Value。如果远程获取操作失败,则get操作抛出异常,不会返回任何Value。

还是码头工人的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,那么只需派出一个人去对岸取货,其他四个人在港口等待即可,而不用5个人全去。

基于超时的简单模式和常规模式区别在于对于同一个超时的Key,前者每个get线程一旦发现Key不存在,则发起远程调用获取值;而后者每个get线程发现Key不存在,则还要判断当前是否有其他线程已经发起了远程调用操作获取新值,如果有,自己就简单的等待即可。

显然基于超时的常规模式比基于超时的简单模式更加优化,减少了超时时并发访问后端的调用量。

实现基于超时的常规模式就需要用到经典的Double-checked locking惯用法了。

3. 基于刷新的简单(stupid)模式

在T到达后,Cache中的Key和相应Value不动,但是如果有线程调用get操作,将触发refresh操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:任何线程发现Key过期,都触发一次refresh操作,get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value。注意refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。
  • 异步模式:任何线程发现Key过期,都触发一次refresh操作,get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。

举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 5个人各自去远程取新货,如果取货失败,则拿着旧货返回(同步模式)
  • 5个人各自通知5个雇佣工去取新货,5个工人拿着旧货先回(异步模式)

4. 基于刷新的常规模式

在T到达后,Cache中的Key和相应Value都不会被清除,而是被标记为旧数据,如果有线程调用get操作,将触发refresh更新操作,根据get和refresh的同步关系,又分为两种模式:

  • 同步模式:get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value,注意:refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。如果其他线程进行get操作,Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。
  • 异步模式:get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。如果其他线程进行get操作,发现Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。

再举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

  • 派一个人去远方港口取新货,其余4个人拿着旧货先回(同步模式)。
  • 5个人通知一个雇佣工去远方取新货,5个人都拿着旧货先回(异步模式)。

基于刷新的简单模式和基于刷新的常规模式区别就在于取数线程之间能否感知当前数据是否正处在刷新状态,因为基于刷新的简单模式中取数线程无法感知当前过期数据是否正处在刷新状态,所以每个取数线程都会触发一个刷新操作,造成一定的线程资源浪费。

而基于超时的常规模式和基于刷新的常规模式区别在于前者过期数据将不能对外访问,所以一旦数据过期,各线程要么拿到数据,要么抛出异常;后者过期数据可以对外访问,所以一旦数据过期,各线程要么拿到新数据,要么拿到旧数据。

5. 基于刷新的续费模式

该模式和基于刷新的常规模式唯一的区别在于refresh操作超时或失败的处理上。在基于刷新的常规模式中,refresh操作超时或失败时抛出异常,Cache中的相应Key-Value还是旧值,这样下一个get操作到来时又会触发一次refresh操作。

在基于刷新的续费模式中,如果refresh操作失败,那么refresh将把旧值当成新值返回,这样就相当于旧值又被续费了T时间,后续T时间内get操作将取到这个续费的旧值而不会触发refresh操作。

基于刷新的续费模式也像常规模式那样分为同步模式和异步模式,不再赘述。

下面讨论这5种Cache get模式在服务过载发生时的表现,首先假设如下:

  • 假设A系统的访问量为每分钟M次。
  • 假设Cache能存Key为C个,并且Key空间有N个。
  • 假设正常状态下,B系统访问量为每分钟W次,显然W\<\<M。

这时因为某种原因,比如B长时间故障,造成Cache中得Key全部过期,B系统这时从故障中恢复,五种get模式分析表现分析如下:

  1. 在基于超时和刷新的简单模式中,B系统的瞬间流量将达到和A的瞬时流量M大体等同,相当于Cache被击穿。这就发生了服务过载,这时刚刚恢复的B系统将肯定会被大流量压垮。
  2. 在基于超时和刷新的常规模式中,B系统的瞬间流量将和Cache中Key空间N大体等同。这时是否发生服务过载,就要看Key空间N是否超过B系统的流量上限了。
  3. 在基于刷新的续费模式中,B系统的瞬间流量为W,和正常情况相同而不会发生服务过载。实际上,在基于刷新的续费模式中,不存在Cache Key全部过期的情况,就算把B系统永久性地干掉,A系统的Cache也会基于旧值长久的平稳运行。

第3点,B系统不会发生服务过载的主要原因是基于刷新的续费模式下不会出现chache中的Key全部长时间过期的情况,即使B系统长时间不可用,基于刷新的续费模式也会在一个过期周期内把旧值当成新值继续使用。所以当B系统恢复时,A系统的Cache都处在正常工作状态。

从B系统的角度看,能够抵抗服务过载的基于刷新的续费模式最优。

从A系统的角度看,由于一般情况下A系统是一个高访问量的在线web应用,这种应用最讨厌的一个词就是“线程等待”,因此基于刷新的各种异步模式较优。

综合考虑,基于刷新的异步续费模式是首选

然而凡是有利就有弊,有两点需要注意的地方:

  1. 基于刷新模式最大的缺点是Key-Value一旦放入Cache就不会被清除,每次更新也是新值覆盖旧值,JVM GC永远无法对其进行垃圾收集,而基于超时的模式中,Key-Value超时后如果新的访问没有到来,内存是可以被GC垃圾回收的。所以如果你使用的是寸土寸金的本地内存做Cache就要小心了。
  2. 基于刷新的续费模式需要做好监控,不然有可能Cache中的值已经和真实的值相差很远了,应用还以为是新值而使用。

关于具体的Cache,来自Google的Guava本地缓存库支持上文的第二种、第四种和第五种get操作模式。

但是对于Redis等分布式缓存,只提供原始的get、set方法,而提供的get仅仅是获取,与上文提到的五种get操作模式不是一个概念。开发者想用这五种get操作模式的话不得不自己封装和实现。

五种get操作模式中,基于超时和刷新的简单模式是实现起来最简单的模式,但遗憾的是这两种模式对服务过载完全无免疫力,这可能也是服务过载在大量依赖缓存的系统中频繁发生的一个重要原因吧。

本文之所以把第1、3种模式称为stupid模式,是想强调这种模式应该尽量避免,Guava里面根本没有这种模式,而Redis只提供简单的读写操作,很容易就把系统实现成了这种方式。

应对分布式Cache宕机

如果是Cache直接挂了,那么就算是基于刷新的异步续费模式也无能为力了。这时A系统铁定无法对Cache进行存取操作,只能将流量完全打到B系统,B系统面对服务过载在劫难逃……

本节讨论的预防Cache宕机仅限于分布式Cache,因为本地Cache一般和A系统应用共享内存和进程,本地Cache挂了A系统也挂了,不会出现本地Cache挂了而A系统应用正常的情况。

首先,A系统请求线程检查分布式Cache状态,如果无应答则说明分布式Cache挂了,则转向请求B系统,这样一来大流量将压垮B系统。这时可选的方案如下:

  1. A系统的当前线程不请求B系统,而是打个日志并设置一个默认值。
  2. A系统的当前线程按照一定概率决定是否请求B系统。
  3. A系统的当前线程检查B系统运行情况,如果良好则请求B系统。

方案1最简单,A系统知道如果没有Cache,B系统可能扛不住自己的全部流量,索性不请求B系统,等待Cache恢复。但这时B系统利用率为0,显然不是最优方案,而且当请求的Value不容易设置默认值时,这个方案就不行了。

方案2可以让一部分线程请求B系统,这部分请求肯定能被B系统hold住。可以保守的设置这个概率 u =(B系统的平均流量)/(A系统的峰值流量)

方案3是一种更为智能的方案,如果B系统运行良好,当前线程请求;如果B系统过载,则不请求,这样A系统将让B系统处于一种宕机与不宕机的临界状态,最大限度挖掘B系统性能。这种方案要求B系统提供一个性能评估接口返回Yes和No,Yes表示B系统良好,可以请求;No表示B系统情况不妙,不要请求。这个接口将被频繁调用,必须高效。

方案3的关键在于如何评估一个系统的运行状况。一个系统中当前主机的性能参数有CPU负载、内存使用率、Swap使用率、GC频率和GC时间、各个接口平均响应时间等,性能评估接口需要根据这些参数返回Yes或者No,是不是机器学习里的二分类问题?😄关于这个问题已经可以单独写篇文章讨论了,在这里就不展开了,你可以想一个比较简单傻瓜的保守策略,缺点是A系统的请求无法很好的逼近B系统的性能极限。

综合以上分析,方案2比较靠谱。如果选择方案3,建议由专门团队负责研究并提供统一的系统性能实时评估方案和工具。

应对分布式Cache宕机后的恢复

不要以为成功hold住分布式Cache宕机就万事大吉了,真正的考验是分布式Cache从宕机过程恢复之后,这时分布式Cache中什么都没有。

即使是上文中提到了基于刷新的异步续费策略这时也没用,因为分布式Cache为空,无论如何都要请求B系统。这时B系统的最大流量是Key的空间取值数量。

如果Key的取值空间数量很少,则相安无事;如果Key的取值空间数量大于B系统的流量上限,服务过载依然在所难免。

这种情况A系统很难处理,关键原因是A系统请求Cache返回Key对应Value为空,A系统无法知道是因为当前Cache是刚刚初始化,所有内容都为空;还是因为仅仅是自己请求的那个Key没在Cache里

如果是前者,那么当前线程就要像处理Cache宕机那样进行某种策略的回避;如果是后者,直接请求B系统即可,因为这是正常的Cache使用流程。

对于Cache宕机的恢复,A系统真的无能为力,只能寄希望于B系统的方案了。

Server端的方案

相对于Client端需要应对各种复杂问题,Server端需要应对的问题非常简单,就是如何从容应对过载的问题。无论是缓存击穿也好,还是拒绝服务攻击也罢,对于Server端来说都是过载保护的问题。对于过载保护,主要给出两种可行方案,以及一种比较复杂的方案思路。

流量控制

流量控制就是B系统实时监控当前流量,如果超过预设的值或者系统承受能力,则直接拒绝掉一部分请求,以实现对系统的保护。

流量控制根据基于的数据不同,可分为两种:

  1. 基于流量阈值的流控:流量阈值是每个主机的流量上限,流量超过该阈值主机将进入不稳定状态。阈值提前进行设定,如果主机当前流量超过阈值,则拒绝掉一部分流量,使得实际被处理流量始终低于阈值。
  2. 基于主机状态的流控:每个接受每个请求之前先判断当前主机状态,如果主机状况不佳,则拒绝当前请求。

基于阈值的流控实现简单,但是最大的问题是需要提前设置阈值,而且随着业务逻辑越来越复杂,接口越来越多,主机的服务能力实际应该是下降的,这样就需要不断下调阈值,增加了维护成本,而且万一忘记调整的话,呵呵……

主机的阈值可以通过压力测试确定,选择的时候可以保守些。

基于主机状态的流控免去了人为控制,但是其最大的确定上文已经提到:如何根据当前主机各个参数判断主机状态呢?想要完美的回答这个问题目测并不容易,因此在没有太好答案之前,我推荐基于阈值的流控。

流量控制基于实现位置的不同,又可以分为两种:

  1. 反向代理实现流控:在反向代理如Nginx上基于各种策略进行流量控制。这种一般针对HTTP服务。
  2. 借助服务治理系统:如果Server端是RMI、RPC等服务,可以构建专门的服务治理系统进行负载均衡、流控等服务。
  3. 服务容器实现流控:在应用代码里,业务逻辑之前实现流量控制。

第3种在服务器的容器(如Java容器)中实现流控并不推荐,因为流控和业务代码混在一起容易混乱;其次实际上流量已经全量进入到了业务代码里,这时的流控只是阻止其进入真正的业务逻辑,所以流控效果将打折;还有,如果流量策略经常变动,系统将不得不为此经常更改。

因此,推荐前两种方式。

最后提一个注意点:当因为流控而拒绝请求时,务必在返回的数据中带上相关信息(比如“当前请求因为超出流量而被禁止访问”),如果返回值什么都没有将是一个大坑。因为造成调用方请求没有被响应的原因很多,可能是调用方Bug,也可能是服务方Bug,还可能是网络不稳定,这样一来很可能在排查一整天后发现是流控搞的鬼……

服务降级

服务降级一般由人为触发,属于服务过载造成崩溃恢复时的策略,但为了和流控对比,将其放到这里。

流量控制本质上是减小访问量,而服务处理能力不变;而服务降级本质上是降低了部分服务的处理能力,增强另一部分服务处理能力,而访问量不变。

服务降级是指在服务过载时关闭不重要的接口(直接拒绝处理请求),而保留重要的接口。比如服务由10个接口,服务降级时关闭了其中五个,保留五个,这时这个主机的服务处理能力将增强到二倍左右。

然而,服务过载发生时动辄就超出系统处理能力10倍,而服务降级能使主机服务处理能力提高10倍么?显然很困难,因此服务过载的应对不能只依靠服务降级策略。

动态扩展

动态扩展指的是在流量超过系统服务能力时,自动触发集群扩容,自动部署并上线运行;当流量过去后又自动回收多余机器,完全弹性。

这个方案是不是感觉很不错。但是目前互联网公司的在线应用跑在云上的本身就不多,要完全实现在线应用的自动化弹性运维,要走的路就更多了。

崩溃恢复

如果服务过载造成系统崩溃还是不幸发生了,这时需要运维控制流量,等后台系统启动完毕后循序渐进的放开流量,主要目的是让Cache慢慢预热。流量控制刚开始可以为10%,然后20%,然后50%,然后80%,最后全量,当然具体的比例,尤其是初始比例,还要看后端承受能力和前端流量的比例,各个系统并不相同。

如果后端系统有专门的工具进行Cache预热,则省去了运维的工作,等Cache热起来再发布后台系统即可。但是如果Cache中的Key空间很大,开发预热工具将比较困难。

结论

“防患于未然”放在服务过载的应对上也是适合的,预防为主,补救为辅。综合上文分析,具体的预防要点如下:

  1. 调用方(A系统)采用基于刷新的异步续费模式使用Cache,或者至少不能使用基于超时或刷新的简单(stupid)模式。
  2. 调用方(A系统)每次请求Cache时检查Cache是否可用(available),如果不可用则按照一个保守的概率访问后端,而不是无所顾忌的直接访问后端。
  3. 服务方(B系统)在反向代理处设置流量控制进行过载保护,阈值需要通过压测获得。

崩溃的补救主要还是靠运维和研发在发生时的通力合作:观察流量变化准确定位崩溃原因,运维控流量研发持续关注性能变化。

未来如果有条件的话可以研究下主机应用健康判断问题和动态弹性运维问题,毕竟自动化比人为操作要靠谱。

原文出自:https://tech.meituan.com/archives

消息队列设计精要

美团点评阅读(1853)

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。
本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。
本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。
也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。
比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。
而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。
再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。
  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
    回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
    整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
    具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
    broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
    我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。
像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
    掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。
简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。
市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。
消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。
当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

  1. 发送关系的维护。
  2. 发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。
具体来说:

  1. producer往broker发送消息之前,需要做一次落地。
  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。
  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。
重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。
Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。
最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,
消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。
但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

  1. 允许消息丢失。
  2. 从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。
谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。
  2. 一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就
能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。
  2. 状态机。
版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。
如果到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储
就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。
就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。
举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。
我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。
减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。
  2. 本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。
那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。
然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。
而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。
本地事务存在两个最大的使用障碍:

  1. 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
  2. 对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心
的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。
服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。
整个过程可以参考下面的代码:

客户端同步服务端异步。

Future<Result> future = request(server);//server立刻返回future
synchronized(future){
while(!future.isDone()){
   future.wait();//server处理结束后会notify这个future,并修改isdone标志
}
}
return future.get();

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;

客户端异步服务端异步。

Future<Result> future = request(server);//server立刻返回future

return future

上面说了这么多,其实是想让大家脱离两个误区:

  1. RPC只有客户端能做异步,服务端不能。
  2. 异步只能通过线程池。

那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并
(当然接口可以设计成batch的,但可能batch发过来的仍然数量较少)。而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。
来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。
回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端如果使用异步模型,则可能因消息合并带来一定程度上的消息延迟),所以可以先使用线程池提交一个发送请求,主流程继续往下走。
但是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。所以这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。
总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。
  2. 到达了一定时间。
  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push还是pull

上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。
我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。
但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,
可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?
当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。
即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。
在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。
反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。
  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~
Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。

总结

本文从为何使用消息队列开始讲起,然后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些,让大家对消息队列有个提纲挈领的整体认识,并给自主开发消息队列提供思路。另外,本文主要是源自自己在开发消息队列中的思考和读源码时的体会,比较不”官方”,也难免会存在一些漏洞,欢迎大家多多交流。

后续我们还会推出消息队列设计高级篇,内容会涵盖以下方面:

  • pull模型消息系统设计理念
  • 存储子系统设计
  • 流量控制
  • 公平调度

敬请期待哦~

作者简介

王烨,现在是美团旅游后台研发组的程序猿,之前曾经在百度、去哪和优酷工作过,专注Java后台开发。对于网络编程和并发编程具有浓厚的兴趣,曾经做过一些基础组件,也翻过一些源码,属于比较典型的宅男技术控。期待能够与更多知己,在coding的路上并肩前行~

原文出自:https://tech.meituan.com/archives

美团外卖订单中心的演进

美团点评阅读(1896)

前言

美团外卖从2013年9月成交第一单以来,已走过了三个年头。期间,业务飞速发展,美团外卖由日均几单发展为日均500万单(9月11日已突破600万)的大型O2O互联网外卖服务平台。平台支持的品类也由最初外卖单品拓展为全品类。

随着订单量的增长、业务复杂度的提升,外卖订单系统也在不断演变进化,从早期一个订单业务模块到现在分布式可扩展的高性能、高可用、高稳定订单系统。整个发展过程中,订单系统经历了几个明显的阶段,下面本篇文章将为大家介绍一下订单系统的演进过程,重点关注各阶段的业务特征、挑战及应对之道。

为方便大家更好地了解整个演进过程,我们首先看一下外卖业务。

外卖订单业务

外卖订单业务是一个需要即时送的业务,对实时性要求很高。从用户订餐到最终送达用户,一般在1小时内。如果最终送达用户时间变长,会带来槽糕的用户体验。在1小时内,订单会快速经过多个阶段,直到最终送达用户。各个阶段需要紧密配合,确保订单顺利完成。

下图是一个用户视角的订单流程图。

订单流程图

从普通用户的角度来看,一个外卖订单从下单后,会经历支付、商家接单、配送、用户收货、售后及订单完成多个阶段。以技术的视角来分解的话,每个阶段依赖于多个子服务来共同完成,比如下单会依赖于购物车、订单预览、确认订单服务,这些子服务又会依赖于底层基础系统来完成其功能。

外卖业务另一个重要特征是一天内订单量会规律变化,订单会集中在中午、晚上两个“饭点”附近,而其它时间的订单量较少。这样,饭点附近系统压力会相对较大。

下图是一天内的外卖订单量分布图

订单量分布图

总结而言,外卖业务具有如下特征:

  • 流程较长且实时性要求高;
  • 订单量高且集中。

下面将按时间脉络为大家讲解订单系统经历的各个阶段、各阶段业务特征、挑战以及应对之道。

订单系统雏型

外卖业务发展早期,第一目标是要能够快速验证业务的可行性。技术上,我们需要保证架构足够灵活、快速迭代从而满足业务快速试错的需求。

在这个阶段,我们将订单相关功能组织成模块,与其它模块(门店模块等)一起形成公用jar包,然后各个系统通过引入jar包来使用订单功能。

早期系统的整体架构图如下所示:

订单系统雏型

早期,外卖整体架构简单、灵活,公共业务逻辑通过jar包实现后集成到各端应用,应用开发部署相对简单。比较适合业务早期逻辑简单、业务量较小、需要快速迭代的情况。但是,随着业务逻辑的复杂、业务量的增长,单应用架构的弊端逐步暴露出来。系统复杂后,大家共用一个大项目进行开发部署,协调的成本变高;业务之间相互影响的问题也逐渐增多。

早期业务处于不断试错、快速变化、快速迭代阶段,通过上述架构,我们能紧跟业务,快速满足业务需求。随着业务的发展以及业务的逐步成熟,我们对系统进行逐步升级,从而更好地支持业务。

独立的订单系统

2014年4月,外卖订单量达到了10万单/日,而且订单量还在持续增长。这时候,业务大框架基本成型,业务在大框架基础上快速迭代。大家共用一个大项目进行开发部署,相互影响,协调成本变高;多个业务部署于同一VM,相互影响的情况也在增多。

为解决开发、部署、运行时相互影响的问题。我们将订单系统进行独立拆分,从而独立开发、部署、运行,避免受其它业务影响。

系统拆分主要有如下几个原则:

  • 相关业务拆分独立系统;
  • 优先级一致的业务拆分独立系统;
  • 拆分系统包括业务服务和数据。

基于以上原则,我们将订单系统进行独立拆分,所有订单服务通过RPC接口提供给外部使用。订单系统内部,我们将功能按优先级拆分为不同子系统,避免相互影响。订单系统通过MQ(队列)消息,通知外部订单状态变更。

独立拆分后的订单系统架构如下所示:

独立拆分后的订单系统

其中,最底层是数据存储层,订单相关数据独立存储。订单服务层,我们按照优先级将订单服务划分为三个系统,分别为交易系统、查询系统、异步处理系统。

独立拆分后,可以避免业务间的相互影响。快速支持业务迭代需求的同时,保障系统稳定性。

高性能、高可用、高稳定的订单系统

订单系统经过上述独立拆分后,有效地避免了业务间的相互干扰,保障迭代速度的同时,保证了系统稳定性。这时,我们的订单量突破百万,而且还在持续增长。之前的一些小问题,在订单量增加后,被放大,进而影响用户体验。比如,用户支付成功后,极端情况下(比如网络、数据库问题)会导致支付成功消息处理失败,用户支付成功后依然显示未支付。订单量变大后,问题订单相应增多。我们需要提高系统的可靠性,保证订单功能稳定可用。

另外,随着订单量的增长、订单业务的复杂,对订单系统的性能、稳定性、可用性等提出了更高的要求。

为了提供更加稳定、可靠的订单服务,我们对拆分后的订单系统进行进一步升级。下面将分别介绍升级涉及的主要内容。

性能优化

系统独立拆分后,可以方便地对订单系统进行优化升级。我们对独立拆分后的订单系统进行了很多的性能优化工作,提升服务整体性能,优化工作主要涉及如下几个方面。

异步化

服务所需要处理的工作越少,其性能自然越高。可以通过将部分操作异步化来减少需要同步进行的操作,进而提升服务的性能。异步化有两种方案。

  • 线程或线程池:将异步操作放在单独线程中处理,避免阻塞服务线程;
  • 消息异步:异步操作通过接收消息完成。

异步化带来一个隐患,如何保障异步操作的执行。这个场景主要发生在应用重启时,对于通过线程或线程池进行的异步化,JVM重启时,后台执行的异步操作可能尚未完成。这时,需要通过JVM优雅关闭来保证异步操作进行完成后,JVM再关闭。通过消息来进行的,消息本身已提供持久化,不受应用重启影响。

具体到订单系统,我们通过将部分不必同步进行的操作异步化,来提升对外服务接口的性能。不需要立即生效的操作即可以异步进行,比如发放红包、PUSH推送、统计等。

以订单配送PUSH推送为例,将PUSH推送异步化后的处理流程变更如下所示:

异步化

PUSH异步化后,线程#1在更新订单状态、发送消息后立即返回,而不用同步等待PUSH推送完成。而PUSH推送异步在线程#2中完成。

并行化

操作并行化也是提升性能的一大利器,并行化将原本串行的工作并行执行,降低整体处理时间。我们对所有订单服务进行分析,将其中非相互依赖的操作并行化,从而提升整体的响应时间。

以用户下单为例,第一步是从各个依赖服务获取信息,包括门店、菜品、用户信息等。获取这些信息并不需要相互依赖,故可以将其并行化,并行后的处理流程变更如下所示:

并行化

通过将获取信息并行化,可有效缩短下单时间,提升下单接口性能。

缓存

通过将统计信息进行提前计算后缓存,避免获取数据时进行实时计算,从而提升获取统计数据的服务性能。比如对于首单、用户已减免配送费等,通过提前计算后缓存,可以简化实时获取数据逻辑,节约时间。

以用户已减免配送费为例,如果需要实时计算,则需要取到用户所有订单后,再进行计算,这样实时计算成本较高。我们通过提前计算,缓存用户已减免配送费。需要取用户已减免配送费时,从缓存中取即可,不必实时计算。具体来说,包括如下几点:

  • 通过缓存保存用户已减免配送费;
  • 用户下单时,如果订单有减免配送费,增加缓存中用户减免配送费金额(异步进行);
  • 订单取消时,如果订单有减免配送费,减少缓存中用户减免配送费金额(异步进行);

一致性优化

订单系统涉及交易,需要保证数据的一致性。否则,一旦出现问题,可能会导致订单不能及时配送、交易金额不对等。

交易一个很重要的特征是其操作具有事务性,订单系统是一个复杂的分布式系统,比如支付涉及订单系统、支付平台、支付宝/网银等第三方。仅通过传统的数据库事务来保障不太可行。对于订单交易系统的事务性,并不要求严格满足传统数据库事务的ACID性质,只需要最终结果一致即可。针对订单系统的特征,我们通过如下种方式来保障最终结果的一致性。

重试/幂等

通过延时重试,保证操作最终会最执行。比如退款操作,如退款时遇到网络或支付平台故障等问题,会延时进行重试,保证退款最终会被完成。重试又会带来另一个问题,即部分操作重复进行,需要对操作进行幂等处理,保证重试的正确性。

以退款操作为例,加入重试/幂等后的处理流程如下所示:

引入重试的退款流程

退款操作首先会检查是否已经退款,如果已经退款,直接返回。否则,向支付平台发起退款,从而保证操作幂等,避免重复操作带来问题。如果发起退款失败(比如网络或支付平台故障),会将任务放入延时队列,稍后重试。否则,直接返回。

通过重试+幂等,可以保证退款操作最终一定会完成。

2PC

2PC是指分布式事务的两阶段提交,通过2PC来保证多个系统的数据一致性。比如下单过程中,涉及库存、优惠资格等多个资源,下单时会首先预占资源(对应2PC的第一阶段),下单失败后会释放资源(对应2PC的回滚阶段),成功后会使用资源(对应2PC的提交阶段)。对于2PC,网上有大量的说明,这里不再继续展开。

高可用

分布式系统的可用性由其各个组件的可用性共同决定,要提升分布式系统的可用性,需要综合提升组成分布式系统的各个组件的可用性。

针对订单系统而言,其主要组成组件包括三类:存储层、中间件层、服务层。下面将分层说明订单系统的可用性。

存储层

存储层的组件如MySQL、ES等本身已经实现了高可用,比如MySQL通过主从集群、ES通过分片复制来实现高可用。存储层的高可用依赖各个存储组件即可。

中间件层

分布式系统会大量用到各类中间件,比如服务调用框架等,这类中间件一般使用开源产品或由公司基础平台提供,本身已具备高可用。

服务层

在分布式系统中,服务间通过相互调用来完成业务功能,一旦某个服务出现问题,会级联影响调用方服务,进而导致系统崩溃。分布式系统中的依赖容灾是影响服务高可用的一个重要方面。

依赖容灾主要有如下几个思路:

  • 依赖超时设置;
  • 依赖灾备;
  • 依赖降级;
  • 限制依赖使用资源;

订单系统会依赖多个其它服务,也存在这个问题。当前订单系统通过同时采用上述四种方法,来避免底层服务出现问题时,影响整体服务。具体实现上,我们采用Hystrix框架来完成依赖容灾功能。Hystrix框架采用上述四种方法,有效实现依赖容灾。订单系统依赖容灾示意图如下所示

资源软隔离

通过为每个依赖服务设置独立的线程池、合理的超时时间及出错时回退方法,有效避免服务出现问题时,级联影响,导致整体服务不可用,从而实现服务高可用。

另外,订单系统服务层都是无状态服务,通过集群+多机房部署,可以避免单点问题及机房故障,实现高可用。

小结

上面都是通过架构、技术实现层面来保障订单系统的性能、稳定性、可用性。实际中,有很多的事故是人为原因导致的,除了好的架构、技术实现外,通过规范、制度来规避人为事故也是保障性能、稳定性、可用性的重要方面。订单系统通过完善需求review、方案评审、代码review、测试上线、后续跟进流程来避免人为因素影响订单系统稳定性。

通过以上措施,我们将订单系统建设成了一个高性能、高稳定、高可用的分布式系统。其中,交易系统tp99为150ms、查询系统tp99时间为40ms。整体系统可用性为6个9。

可扩展的订单系统

订单系统经过上面介绍的整体升级后,已经是一个高性能、高稳定、高可用的分布式系统。但是系统的的可扩展性还存在一定问题,部分服务只能通过垂直扩展(增加服务器配置)而不能通过水平扩展(加机器)来进行扩容。但是,服务器配置有上限,导致服务整体容量受到限制。

到2015年5月的时候,这个问题就比较突出了。当时,数据库服务器写接近单机上限。业务预期还会继续快速增长。为保障业务的快速增长,我们对订单系统开始进行第二次升级。目标是保证系统有足够的扩展性,从而支撑业务的快速发展。

分布式系统的扩展性依赖于分布式系统中各个组件的可扩展性,针对订单系统而言,其主要组成组件包括三类:存储层、中间件层、服务层。下面将分层说明如何提高各层的可扩展性。

存储层

订单系统存储层主要依赖于MySQL持久化、tair/redis cluster缓存。tair/redis cluster缓存本身即提供了很好的扩展性。MySQL可以通过增加从库来解决读扩展问题。但是,对于写MySQL存在单机容量的限制。另外,数据库的整体容量受限于单机硬盘的限制。

存储层的可扩展性改造主要是对MySQL扩展性改造。

  • 分库分表

写容量限制是受限于MySQL数据库单机处理能力限制。如果能将数据拆为多份,不同数据放在不同机器上,就可以方便对容量进行扩展。

对数据进行拆分一般分为两步,第一步是分库,即将不同表放不同库不同机器上。经过第一步分库后,容量得到一定提升。但是,分库并不能解决单表容量超过单机限制的问题,随着业务的发展,订单系统中的订单表即遇到了这个问题。

针对订单表超过单库容量的问题,需要进行分表操作,即将订单表数据进行拆分。单表数据拆分后,解决了写的问题,但是如果查询数据不在同一个分片,会带来查询效率的问题(需要聚合多张表)。由于外卖在线业务对实时性、性能要求较高。我们针对每个主要的查询维度均保存一份数据(每份数据按查询维度进行分片),方便查询。

具体来说,外卖主要涉及三个查询维度:订单ID、用户ID、门店ID。对订单表分表时,对于一个订单,我们存三份,分别按照订单ID、用户ID、 门店ID以一定规则存储在每个维度不同分片中。这样,可以分散写压力,同时,按照订单ID、用户ID、门店ID三个维度查询时,数据均在一个分片,保证较高的查询效率。

订单表分表后,订单表的存储架构如下所示:

分库分表

可以看到,分表后,每个维度共有100张表,分别放在4个库上面。对于同一个订单,冗余存储了三份。未来,随着业务发展,还可以继续通过将表分到不同机器上来持续获得容量的提升。

分库分表后,订单数据存储到多个库多个表中,为应用层查询带来一定麻烦,解决分库分表后的查询主要有三种方案:

  • MySQL服务器端支持:目前不支持。
  • 中间件。
  • 应用层。

由于MySQL服务器端不能支持,我们只剩下中间件和应用层两个方案。中间件方案对应用透明,但是开发难度相对较大,当时这块没有资源去支持。于是,我们采用应用层方案来快速支持。结合应用开发框架(SPRING+MYBATIS),我们实现了一个轻量级的分库分表访问插件,避免将分库分表逻辑嵌入到业务代码。分库分表插件的实现包括如下几个要点。

  • 配置文件管理分库分表配置信息;
  • JAVA注解说明SQL语句分库分表信息;
  • JAVA AOP解析注解+查询配置文件,获取数据源及表名;
  • MYBATIS动态替换表名;
  • SPRING动态替换数据源。

通过分库分表,解决了写容量扩展问题。但是分表后,会给查询带来一定的限制,只能支持主要维度的查询,其它维度的查询效率存在问题。

  • ES搜索

订单表分表之后,对于ID、用户ID、门店ID外的查询(比如按照手机号前缀查询)存在效率问题。这部分通常是复杂查询,可以通过全文搜索来支持。在订单系统中,我们通过ES来解决分表后非分表维度的复杂查询效率问题。具体来说,使用ES,主要涉及如下几点。

  • 通过databus将订单数据同步到ES。
  • 同步数据时,通过批量写入来降低ES写入压力。
  • 通过ES的分片机制来支持扩展性。

小结

通过对存储层的可扩展性改造,使得订单系统存储层具有较好的可扩展性。对于中间层的可扩展性与上面提到的中间层可用性一样,中间层本身已提供解决方案,直接复用即可。对于服务层,订单系统服务层提供的都是无状态服务,对于无状态服务,通过增加机器,即可获得更高的容量,完成扩容。

通过对订单系统各层可扩展性改造,使得订单系统具备了较好的可扩展性,能够支持业务的持续发展,当前,订单系统已具体千万单/日的容量。

上面几部分都是在介绍如何通过架构、技术实现等手段来搭建一个可靠、完善的订单系统。但是,要保障系统的持续健康运行,光搭建系统还不够,运维也是很重要的一环。

智能运维的订单系统

早期,对系统及业务的运维主要是采用人肉的方式,即外部反馈问题,RD通过排查日志等来定位问题。随着系统的复杂、业务的增长,问题排查难度不断加大,同时反馈问题的数量也在逐步增多。通过人肉方式效率偏低,并不能很好的满足业务的需求。

为提升运维效率、降低人力成本,我们对系统及业务运维进行自动化、智能化改进,改进包括事前、事中、事后措施。

  • 事前措施

事前措施的目的是为提前发现隐患,提前解决,避免问题恶化。

在事前措施这块,我们主要采取如下几个手段:

  1. 定期线上压测:通过线上压测,准确评估系统容量,提前发现系统隐患;
  2. 周期性系统健康体检:通过周期检测CPU利用率、内存利用率、接口QPS、接口TP95、异常数,取消订单数等指标是否异常,可以提前发现提前发现潜在问题、提前解决;
  3. 全链路关键日志:通过记录全链路关键日志,根据日志,自动分析反馈订单问题原因,给出处理结果,有效提高反馈处理效率。
  • 事中措施

事中措施的目的是为及时发现问题、快速解决问题。

事中这块,我们采取的手段包括:

  1. 订单监控大盘:实时监控订单业务指标,异常时报警;
  2. 系统监控大盘:实时监控订单系统指标,异常时报警;
  3. 完善的SOP:报警后,通过标准流程,快速定位问题、解决问题。
  • 事后措施

事后措施是指问题发生后,分析问题原因,彻底解决。并将相关经验教训反哺给事前、事中措施,不断加强事先、事中措施,争取尽量提前发现问题,将问题扼杀在萌芽阶段。

通过将之前人肉进行的运维操作自动化、智能化,提升了处理效率、减少了运维的人力投入。

看完以后是不是想拍个砖、留个言,抒发下己见?可以来微信公众号给我们评论,还能在第一时间获取我们发布的一些实践经验总结、最新的活动报名信息。关注可扫码:

公众号二维码

原文出自:https://tech.meituan.com/archives

HDFS NameNode内存全景

美团点评阅读(1905)

一、概述

从整个HDFS系统架构上看,NameNode是其中最重要、最复杂也是最容易出现问题的地方,而且一旦NameNode出现故障,整个Hadoop集群就将处于不可服务的状态,同时随着数据规模和集群规模地持续增长,很多小量级时被隐藏的问题逐渐暴露出来。所以,从更高层次掌握NameNode的内部结构和运行机制尤其重要。除特别说明外,本文基于社区版本Hadoop-2.4.1[1][2],虽然2.4.1之后已经有多次版本迭代,但是基本原理相同。

NameNode管理着整个HDFS文件系统的元数据。从架构设计上看,元数据大致分成两个层次:Namespace管理层,负责管理文件系统中的树状目录结构以及文件与数据块的映射关系;块管理层,负责管理文件系统中文件的物理块与实际存储位置的映射关系BlocksMap,如图1所示[1]。Namespace管理的元数据除内存常驻外,也会周期Flush到持久化设备上FsImage文件;BlocksMap元数据只在内存中存在;当NameNode发生重启,首先从持久化设备中读取FsImage构建Namespace,之后根据DataNode的汇报信息重新构造BlocksMap。这两部分数据结构是占据了NameNode大部分JVM Heap空间。

HDFS结构图

除了对文件系统本身元数据的管理之外,NameNode还需要维护整个集群的机架及DataNode的信息、Lease管理以及集中式缓存引入的缓存管理等等。这几部分数据结构空间占用相对固定,且占用较小。

测试数据显示,Namespace目录和文件总量到2亿,数据块总量到3亿后,常驻内存使用量超过90GB。

二、内存全景

如前述,NameNode整个内存结构大致可以分成四大部分:Namespace、BlocksMap、NetworkTopology及其它,图2为各数据结构内存逻辑分布图示。

namenode内存全景图

Namespace:维护整个文件系统的目录树结构及目录树上的状态变化;
BlockManager:维护整个文件系统中与数据块相关的信息及数据块的状态变化;
NetworkTopology:维护机架拓扑及DataNode信息,机架感知的基础;
其它:
LeaseManager:读写的互斥同步就是靠Lease实现,支持HDFS的Write-Once-Read-Many的核心数据结构;
CacheManager:Hadoop 2.3.0引入的集中式缓存新特性,支持集中式缓存的管理,实现memory-locality提升读性能;
SnapshotManager:Hadoop 2.1.0引入的Snapshot新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
DelegationTokenSecretManager:管理HDFS的安全访问;
另外还有临时数据信息、统计信息metrics等等。

NameNode常驻内存主要被Namespace和BlockManager使用,二者使用占比分别接近50%。其它部分内存开销较小且相对固定,与Namespace和BlockManager相比基本可以忽略。

三、内存分析

3.1 Namespace

与单机文件系统相似,HDFS对文件系统的目录结构也是按照树状结构维护,Namespace保存了目录树及每个目录/文件节点的属性。除在内存常驻外,这部分数据会定期flush到持久化设备上,生成一个新的FsImage文件,方便NameNode发生重启时,从FsImage及时恢复整个Namespace。图3所示为Namespace内存结构。前述集群中目录和文件总量即整个Namespace目录树中包含的节点总数,可见Namespace本身其实是一棵非常巨大的树。

Namespace内存结构

在整个Namespace目录树中存在两种不同类型的INode数据结构:INodeDirectory和INodeFile。其中INodeDirectory标识的是目录树中的目录,INodeFile标识的是目录树中的文件。由于二者均继承自INode,所以具备大部分相同的公共信息INodeWithAdditionalFields,除常用基础属性外,其中还提供了扩展属性features,如Quota、Snapshot等均通过Feature增加,如果以后出现新属性也可通过Feature方便扩展。不同的是,INodeFile特有的标识副本数和数据块大小组合的header(2.6.1之后又新增了标识存储策略ID的信息)及该文件包含的有序Blocks数组;INodeDirectory则特有子节点的列表children。这里需要特别说明children是默认大小为5的ArrayList,按照子节点name有序存储,虽然在插入时会损失一部分写性能,但是可以方便后续快速二分查找提高读性能,对一般存储系统,读操作比写操作占比要高。具体的继承关系见图4所示。

INode继承关系

3.2 BlockManager

BlocksMap在NameNode内存空间占据很大比例,由BlockManager统一管理,相比Namespace,BlockManager管理的这部分数据要复杂的多。Namespace与BlockManager之间通过前面提到的INodeFile有序Blocks数组关联到一起。图5所示BlockManager管理的内存结构。

BlockManager管理的内存结构

每一个INodeFile都会包含数量不等的Block,具体数量由文件大小及每一个Block大小(默认为64M)比值决定,这些Block按照所在文件的先后顺序组成BlockInfo数组,如图5所示的BlockInfo[A~K],BlockInfo维护的是Block的元数据,结构如图6所示,数据本身是由DataNode管理,所以BlockInfo需要包含实际数据到底由哪些DataNode管理的信息,这里的核心是名为triplets的Object数组,大小为3*replicas,其中replicas是Block副本数量。triplets包含的信息:

  • triplets[i]:Block所在的DataNode;
  • triplets[i+1]:该DataNode上前一个Block;
  • triplets[i+2]:该DataNode上后一个Block;

其中i表示的是Block的第i个副本,i取值[0,replicas)。

BlockInfo继承关系

从前面描述可以看到BlockInfo几块重要信息:文件包含了哪些Block,这些Block分别被实际存储在哪些DataNode上,DataNode上所有Block前后链表关系。

如果从信息完整度来看,以上数据足够支持所有关于HDFS文件系统的正常操作,但还存在一个使用场景较多的问题:不能通过blockid快速定位Block,所以引入了BlocksMap。

BlocksMap底层通过LightWeightGSet实现,本质是一个链式解决冲突的哈希表。为了避免rehash过程带来的性能开销,初始化时,索引空间直接给到了整个JVM可用内存的2%,并且不再变化。集群启动过程,DataNode会进行BR(BlockReport),根据BR的每一个Block计算其HashCode,之后将对应的BlockInfo插入到相应位置逐渐构建起来巨大的BlocksMap。前面在INodeFile里也提到的BlockInfo集合,如果我们将BlocksMap里的BlockInfo与所有INodeFile里的BlockInfo分别收集起来,可以发现两个集合完全相同,事实上BlocksMap里所有的BlockInfo就是INodeFile中对应BlockInfo的引用;通过Block查找对应BlockInfo时,也是先对Block计算HashCode,根据结果快速定位到对应的BlockInfo信息。至此涉及到HDFS文件系统本身元数据的问题基本上已经解决了。

前面提到部分都属于静态数据部分,NameNode内存中所有数据都要随读写情况发生变化,BlockManager当然也需要管理这部分动态数据。主要是当Block发生变化不符合预期时需要及时调整Blocks的分布。这里涉及几个核心的数据结构:

excessReplicateMap:若某个Block实际存储的副本数多于预设副本数,这时候需要删除多余副本,这里多余副本会被置于excessReplicateMap中。excessReplicateMap是从DataNode的StorageID到Block集合的映射集。
neededReplications:若某个Block实际存储的副本数少于预设副本数,这时候需要补充缺少副本,这里哪些Block缺少多少个副本都统一存在neededReplications里,本质上neededReplications是一个优先级队列,缺少副本数越多的Block之后越会被优先处理。
invalidateBlocks:若某个Block即将被删除,会被置于invalidateBlocks中。invalidateBlocks是从DataNode的StorageID到Block集合的映射集。如某个文件被客户端执行了删除操作,该文件所属的所有Block会先被置于invalidateBlocks中。
corruptReplicas:有些场景Block由于时间戳/长度不匹配等等造成Block不可用,会被暂存在corruptReplicas中,之后再做处理。

前面几个涉及到Block分布情况动态变化的核心数据结构,这里的数据实际上是过渡性质的,BlockManager内部的ReplicationMonitor线程(图5标识Thread/Monitor)会持续从其中取出数据并通过逻辑处理后分发给具体的DatanodeDescriptor对应数据结构(3.3 NetworkTopology里会有简单介绍),当对应DataNode的心跳过来之后,NameNode会遍历DatanodeDescriptor里暂存的数据,将其转换成对应指令返回给DataNode,DataNode收到任务并执行完成后再反馈回NameNode,之后DatanodeDescriptor里对应信息被清除。如BlockB预设副本数为3,由于某种原因实际副本变成4(如之前下线的DataNode D重新上线,其中B正好有BlockB的一个副本数据),BlockManager能及时发现副本变化,并将多余的DataNode D上BlockB副本放置到excessReplicateMap中,ReplicationMonitor线程定期检查时发现excessReplicateMap中数据后将其移到DataNode D对应DatanodeDescriptor中invalidateBlocks里,当DataNode D下次心跳过来后,随心跳返回删除Block B的指令,DataNode D收到指令实际删除其上的Block B数据并反馈回NameNode,此后BlockManager将DataNode D上的Block B从内存中清除,至此Block B的副本符合预期,整个流程如图7所示。

副本数异常时处理过程

3.3 NetworkTopology

前面多次提到Block与DataNode之间的关联关系,事实上NameNode确实还需要管理所有DataNode,不仅如此,由于数据写入前需要确定数据块写入位置,NameNode还维护着整个机架拓扑NetworkTopology。图8所示内存中机架拓扑图。

NetworkTopology内存结构

从图8可以看出这里包含两个部分:机架拓扑结构NetworkTopology和DataNode节点信息。其中树状的机架拓扑是根据机架感知(一般都是外部脚本计算得到)在集群启动完成后建立起来,整个机架的拓扑结构在NameNode的生命周期内一般不会发生变化;另一部分是比较关键的DataNode信息,BlockManager已经提到每一个DataNode上的Blocks集合都会形成一个双向链表,更准确的应该是DataNode的每一个存储单元DatanodeStorageInfo上的所有Blocks集合会形成一个双向链表,这个链表的入口就是机架拓扑结构叶子节点即DataNode管理的DatanodeStorageInfo。此外由于上层应用对数据的增删查随时发生变化,随之DatanodeStorageInfo上的Blocks也会动态变化,所以NetworkTopology上的DataNode对象还会管理这些动态变化的数据结构,如replicateBlocks/recoverBlocks/invalidateBlocks,这些数据结构正好和BlockManager管理的动态数据结构对应,实现了数据的动态变化由BlockManager传达到DataNode内存对象最后通过指令下达到物理DataNode实际执行的流动过程,流程在3.2 BlockManager已经介绍。

这里存在一个问题,为什么DatanodeStorageInfo下所有Block之间会以双向链表组织,而不是其它数据结构?如果结合实际场景就不难发现,对每一个DatanodeStorageInfo下Block的操作集中在快速增加/删除(Block动态增减变化)及顺序遍历(BlockReport期间),所以双向链表是非常合适的数据结构。

3.4 LeaseManager

Lease 机制是重要的分布式协议,广泛应用于各种实际的分布式系统中。HDFS支持Write-Once-Read-Many,对文件写操作的互斥同步靠Lease实现。Lease实际上是时间约束锁,其主要特点是排他性。客户端写文件时需要先申请一个Lease,一旦有客户端持有了某个文件的Lease,其它客户端就不可能再申请到该文件的Lease,这就保证了同一时刻对一个文件的写操作只能发生在一个客户端。NameNode的LeaseManager是Lease机制的核心,维护了文件与Lease、客户端与Lease的对应关系,这类信息会随写数据的变化实时发生对应改变。

LeaseManager的内存数据结构

图9所示为LeaseManager内存结构,包括以下三个主要核心数据结构:

sortedLeases:Lease集合,按照时间先后有序组织,便于检查Lease是否超时;
leases:客户端到Lease的映射关系;
sortedLeasesByPath:文件路径到Lease的映射关系;

其中每一个写数据的客户端会对应一个Lease,每个Lease里包含至少一个标识文件路径的Path。Lease本身已经维护了其持有者(客户端)及该Lease正在操作的文件路径集合,之所以增加了leases和sortedLeasesByPath为提高通过Lease持有者或文件路径快速索引到Lease的性能。

由于Lease本身的时间约束特性,当Lease发生超时后需要强制回收,内存中与该Lease相关的内容要被及时清除。超时检查及超时后的处理逻辑由LeaseManager.Monitor统一执行。LeaseManager中维护了两个与Lease相关的超时时间:软超时(softLimit)和硬超时(hardLimit),使用场景稍有不同。

正常情况下,客户端向集群写文件前需要向NameNode的LeaseManager申请Lease;写文件过程中定期更新Lease时间,以防Lease过期,周期与softLimit相关;写完数据后申请释放Lease。整个过程可能发生两类问题:(1)写文件过程中客户端没有及时更新Lease时间;(2)写完文件后没有成功释放Lease。两个问题分别对应为softLimit和hardLimit。两种场景都会触发LeaseManager对Lease超时强制回收。如果客户端写文件过程中没有及时更新Lease超过softLimit时间后,另一客户端尝试对同一文件进行写操作时触发Lease软超时强制回收;如果客户端写文件完成但是没有成功释放Lease,则会由LeaseManager的后台线程LeaseManager.Monitor检查是否硬超时后统一触发超时回收。不管是softLimit还是hardLimit超时触发的强制Lease回收,处理逻辑都一样:FSNamesystem.internalReleaseLease,逻辑本身比较复杂,这里不再展开,简单的说先对Lease过期前最后一次写入的Block进行检查和修复,之后释放超时持有的Lease,保证后面其它客户端的写入能够正常申请到该文件的Lease。

NameNode内存数据结构非常丰富,这里对几个重要的数据结构进行了简单的描述,除了前面罗列之外,其实还有如SnapShotManager/CacheManager等,由于其内存占用有限且有一些特性还尚未稳定,这里不再展开。

四、问题

随着集群中数据规模的不断积累,NameNode内存占用随之成比例增长。不可避免的NameNode内存将逐渐成为集群发展的瓶颈,并开始暴漏诸多问题。

1、启动时间变长。NameNode的启动过程可以分成FsImage数据加载、editlogs回放、Checkpoint、DataNode的BlockReport几个阶段。数据规模较小时,启动时间可以控制在~10min以内,当元数据规模达到5亿(Namespace中INode数超过2亿,Block数接近3亿),FsImage文件大小将接近到20GB,加载FsImage数据就需要~14min,Checkpoint需要~6min,再加上其它阶段整个重启过程将持续~50min,极端情况甚至超过60min,虽然经过多轮优化重启过程已经能够稳定在~30min,但也非常耗时。如果数据规模继续增加,启动过程将同步增加。

2、性能开始下降。HDFS文件系统的所有元数据相关操作基本上均在NameNode端完成,当数据规模的增加致内存占用变大后,元数据的增删改查性能会出现下降,且这种下降趋势会因规模效应及复杂的处理逻辑被放大,相对复杂的RPC请求(如addblock)性能下降更加明显。

3、NameNode JVM FGC(Full GC)风险较高。主要体现在两个方面:(1)FGC频率增加;(2)FGC时间增加且风险不可控。针对NameNode的应用场景,目前看CMS内存回收算法比较主流,正常情况下,对超过100GB内存进行回收处理时,可以控制到秒级别的停顿时间,但是如果回收失败被降级到串行内存回收时,应用的停顿时间将达到数百秒,这对应用本身是致命的。

4、超大JVM Heap Size调试问题。如果线上集群性能表现变差,不得不通过分析内存才能得到结论时,会成为一件异常困难的事情。且不说Dump本身极其费时费力,Dump超大内存时存在极大概率使NameNode不可服务。

针对NameNode内存增长带来的诸多问题,社区和业界都在持续关注并尝试不同的解决方案。整体上两个思路:(1)扩展NameNode分散单点负载;(2)引入外部系统支持NameNode内存数据。

从2010年开始社区就投入大量精力持续解决,Federation方案[3]通过对NameNode进行水平扩展分散单点负载的方式解决NameNode的问题,经过几年的发展该方案逐渐稳定,目前已经被业界广泛使用。除此之外,社区也在尝试将Namespace存储值外部的KV存储系统如LevelDB[4],从而降低NameNode内存负载。

除社区外,业界也在尝试自己的解决方案。Baidu HDFS2[5]将元数据管理通过主从架构的集群形式提供服务,本质上是将原生NameNode管理的Namespace和BlockManagement进行物理拆分。其中Namespace负责管理整个文件系统的目录树及文件到BlockID集合的映射关系,BlockID到DataNode的映射关系是按照一定的规则分到多个服务节点分布式管理,这种方案与Lustre有相似之处(Hash-based Partition)。Taobao HDFS2[6]尝试过采用另外的思路,借助高速存储设备,将元数据通过外存设备进行持久化存储,保持NameNode完全无状态,实现NameNode无限扩展的可能。其它类似的诸多方案不一而足。

尽管社区和业界均对NameNode内存瓶颈有成熟的解决方案,但是不一定适用所有的场景,尤其是中小规模集群。结合实践过程和集群规模发展期可能遇到的NameNode内存相关问题这里有几点建议:

  1. 合并小文件。正如前面提到,目录/文件和Block均会占用NameNode内存空间,大量小文件会降低内存使用效率;另外,小文件的读写性能远远低于大文件的读写,主要原因对小文件读写需要在多个数据源切换,严重影响性能。

  2. 调整合适的BlockSize。主要针对集群内文件较大的业务场景,可以通过调整默认的Block Size大小(参数:dfs.blocksize,默认128M),降低NameNode的内存增长趋势。

  3. HDFS Federation方案。当集群和数据均达到一定规模时,仅通过垂直扩展NameNode已不能很好的支持业务发展,可以考虑HDFS Federation方案实现对NameNode的水平扩展,在解决NameNode的内存问题的同时通过Federation可以达到良好的隔离性,不会因为单一应用压垮整集群。

五、总结

NameNode在整个HDFS系统架构中占据举足轻重的位置,内部数据和处理逻辑相对复杂,本文简单梳理了NameNode的内存全景及对其中几个关键数据结构,从NameNode内存核心数据视角对NameNode进行了简单的解读,并结合实际场景介绍了随着数据规模的增加,NameNode内存可能遇到的问题及业界各种可借鉴的解决方案。在后续的《HDFS NameNode内存详解》中,我们会详细解读NameNode的几个关键数据结构,分析各数据结构在JVM Heap使用占比情况。

六、参考

[1] Apache Hadoop, 2016, https://hadoop.apache.org/.
[2] Apache Hadoop Source Code, 2014, https://github.com/apache/hadoop/tree/branch-2.4.1/.
[3] HDFS Federation, 2011, https://issues.apache.org/jira/browse/HDFS-1052.
[4] NemeNode Scalability, 2013, https://issues.apache.org/jira/browse/HDFS-5389.
[5] Baidu HDFS2, 2013, http://static.zhizuzhefu.com/wordpress_cp/uploads/2013/04/a9.pdf.
[6] Taobao HDFS2, 2012, https://github.com/taobao/ADFS.

原文出自:https://tech.meituan.com/archives

分布式队列编程优化篇

美团点评阅读(2163)

前言

“分布式队列编程”是一个系列文,之前我们已经发布了《分布式队列编程模型、实战》,主要剖析了分布式队列编程模型的需求来源、定义、结构以及其变化多样性;根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用。本文将重点阐述工程师运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议。

确定采用分布式队列编程模型之后,主体架构就算完成了,但工程师的工作还远远未结束。天下事必做于细,细节是一个不错的架构向一个优秀的系统进阶的关键因素。优化篇选取了作者以及其同事在运用分布式队列编程模型架构时所碰到的典型问题和解决方案。这里些问题出现的频率较高,如果你经验不够,很可能会“踩坑”。希望通过这些讲解,帮助读者降低分布式队列编程模型的使用门槛。本文将对分布式队列编程模型的三种角色:生产者(Producer),分布式队列(Queue),消费者(Consumer)分别进行优化讨论。

生产者优化

在分布式队列编程中,生产者往往并非真正的生产源头,只是整个数据流中的一个节点,这种生产者的操作是处理-转发(Process-Forward)模式。

这种模式给工程师们带来的第一个问题是吞吐量问题。这种模式下运行的生产者,一边接收上游的数据,一边将处理完的数据发送给下游。本质上,它是一个非常经典的数学问题,其抽象模型是一些没有盖子的水箱,每个水箱接收来自上一个水箱的水,进行处理之后,再将水发送到下一个水箱。工程师需要预测水源的流量、每个环节水箱的处理能力、水龙头的排水速度,最终目的是避免水溢出水箱,或者尽可能地减小溢出事件的概率。实际上流式编程框架以及其开发者花了大量的精力去处理和优化这个问题。下文的缓存优化和批量写入优化都是针对该问题的解决方案。

第二个需要考虑的问题是持久化。由于各种原因,系统总是会宕机。如果信息比较敏感,例如计费信息、火车票订单信息等,工程师们需要考虑系统宕机所带来的损失,找到让损失最小化的解决方案。持久化优化重点解决这一类问题。

缓存优化

处于“处理-转发”模式下运行的生产者往往被设计成请求驱动型的服务,即每个请求都会触发一个处理线程,线程处理完后将结果写入分布式队列。如果由于某种原因队列服务不可用,或者性能恶化,随着新请求的到来,生产者的处理线程就会产生堆积。这可能会导致如下两个问题:

  • 系统可用性降低。由于每个线程都需要一定的内存开销,线程过多会使系统内存耗尽,甚至可能产生雪崩效应导致最终完全不可用。
  • 信息丢失。为了避免系统崩溃,工程师可能会给请求驱动型服务设置一个处理线程池,设置最大处理线程数量。这是一种典型的降级策略,目的是为了系统崩溃。但是,后续的请求会因为没有处理线程而被迫阻塞,最终可能产生信息丢失。例如:对于广告计费采集,如果采集系统因为线程耗尽而不接收客户端的计费行为,这些计费行为就会丢失。

缓解这类问题的思路来自于CAP理论,即通过降低一致性来提高可用性。生产者接收线程在收到请求之后第一时间不去处理,直接将请求缓存在内存中(牺牲一致性),而在后台启动多个处理线程从缓存中读取请求、进行处理并写入分布式队列。与线程所占用的内存开销相比,大部分的请求所占内存几乎可以忽略。通过在接收请求和处理请求之间增加一层内存缓存,可以大大提高系统的处理吞吐量和可扩展性。这个方案本质上是一个内存生产者消费者模型。

批量写入优化

如果生产者的请求过大,写分布式队列可能成为性能瓶颈,有如下几个因素:

  • 队列自身性能不高。
  • 分布式队列编程模型往往被应用在跨机房的系统里面,跨机房的网络开销往往容易成为系统瓶颈。
  • 消息确认机制往往会大大降低队列的吞吐量以及响应时间。

如果在处理请求和写队列之间添加一层缓存,消息写入程序批量将消息写入队列,可以大大提高系统的吞吐量。原因如下:

  • 批量写队列可以大大减少生产者和分布式队列的交互次数和消息传输量。特别是对于高吞吐小载荷的消息实体,批量写可以显著降低网络传输量。
  • 对于需要确认机制的消息,确认机制往往会大大降低队列的吞吐量以及响应时间,某些高敏感的消息需要多个消息中间件代理同时确认,这近一步恶化性能。在生产者的应用层将多条消息批量组合成一个消息体,消息中间件就只需要对批量消息进行一次确认,这可能会数量级的提高消息传输性能。

持久化优化

通过添加缓存,消费者服务的吞吐量和可用性都得到了提升。但缓存引入了一个新问题——内存数据丢失。对于敏感数据,工程师需要考虑如下两个潜在问题:

  • 如果内存中存在未处理完的请求,而某些原因导致生产者服务宕机,内存数据就会丢失而可能无法恢复。
  • 如果分布式队列长时间不可用,随着请求数量的不断增加,最终系统内存可能会耗尽而崩溃,内存的消息也可能丢失。

所以缓存中的数据需要定期被持久化到磁盘等持久层设备中,典型的持久化触发策略主要有两种:

  • 定期触发,即每隔一段时间进行一次持久化。
  • 定量触发,即每当缓存中的请求数量达到一定阈值后进行持久化。
    是否需要持久化优化,以及持久化策略应该由请求数据的敏感度、请求量、持久化性能等因素共同决定。

中间件选型

分布式队列不等同于各种开源的或者收费的消息中间件,甚至在一些场景下完全不需要使用消息中间件。但是,消息中间件产生的目的就是解决消息传递问题,这为分布式队列编程架构提供了很多的便利。在实际工作中,工程师们应该将成熟的消息中间件作为队列的首要备选方案。
本小节对消息中间件的功能、模型进行阐述,并给出一些消息中间件选型、部署的具体建议。

中间件的功能

明白一个系统的每个具体功能是设计和架构一个系统的基础。典型的消息中间件主要包含如下几个功能:

  • 消息接收
  • 消息分发
  • 消息存储
  • 消息读取

概念模型

抽象的消息中间件模型包含如下几个角色:

  • 发送者和接收者客户端(Sender/Receiver Client),在具体实施过程中,它们一般以库的形式嵌入到应用程序代码中。
  • 代理服务器(Broker Server),它们是与客户端代码直接交互的服务端代码。
  • 消息交换机(Exchanger),接收到的消息一般需要通过消息交换机(Exchanger)分发到具体的消息队列中。
  • 消息队列,一般是一块内存数据结构或持久化数据。
    概念模型如下图:
    消息队列中间件
    为了提高分发性能,很多消息中间件把消息代理服务器的拓扑图发送到发送者和接收者客户端(Sender/Receiver Client),如此一来,发送源可以直接进行消息分发。

选型标准

要完整的描述消息中间件各个方面非常困难,大部分良好的消息中间件都有完善的文档,这些文档的长度远远超过本文的总长度。但如下几个标准是工程师们在进行消息中间件选型时经常需要考虑和权衡的。

性能

性能主要有两个方面需要考虑:吞吐量(Throughput)和响应时间(Latency)。
不同的消息队列中间件的吞吐量和响应时间相差甚远,在选型时可以去网上查看一些性能对比报告。
对于同一种中间件,不同的配置方式也会影响性能。主要有如下几方面的配置:

  • 是否需要确认机制,即写入队列后,或从队列读取后,是否需要进行确认。确认机制对响应时间的影响往往很大。
  • 能否批处理,即消息能否批量读取或者写入。批量操作可以大大减少应用程序与消息中间件的交互次数和消息传递量,大大提高吞吐量。
  • 能否进行分区(Partition)。将某一主题消息队列进行分区,同一主题消息可以有多台机器并行处理。这不仅仅能影响消息中间件的吞吐量,还决定着消息中间件是否具备良好的可伸缩性(Scalability)。
  • 是否需要进行持久化。将消息进行持久化往往会同时影响吞吐量和响应时间。

可靠性

可靠性主要包含:可用性、持久化、确认机制等。
高可用性的消息中间件应该具备如下特征:

  • 消息中间件代理服务器(Broker)具有主从备份。即当一台代理服务宕机之后,备用服务器能接管相关的服务。
  • 消息中间件中缓存的消息是否有备份、并持久化。
    根据CAP理论,高可用、高一致性以及网络分裂不可兼得。根据作者的观察,大部分的消息中间件在面临网络分裂的情况下下,都很难保证数据的一致性以及可用性。 很多消息中间件都会提供一些可配置策略,让使用者在可用性和一致性之间做权衡。

高可靠的消息中间件应该确保从发送者接收到的消息不会丢失。中间件代理服务器的宕机并不是小概率事件,所以保存在内存中的消息很容易发生丢失。大部分的消息中间件都依赖于消息的持久化去降低消息丢失损失,即将接收到的消息写入磁盘。即使提供持久化,仍有两个问题需要考虑:

  • 磁盘损坏问题。长时间来看,磁盘出问题的概率仍然存在。
  • 性能问题。与操作内存相比,磁盘I/O的操作性能要慢几个数量级。频繁持久化不仅会增加响应时间,也会降低吞吐量。
    解决这两个问题的一个解决方案就是:多机确认,定期持久化。即消息被缓存在多台机器的内存中,只有每台机器都确认收到消息,才跟发送者确认(很多消息中间件都会提供相应的配置选项,让用户设置最少需要多少台机器接收到消息)。由于多台独立机器同时出故障的概率遵循乘法法则,指数级降低,这会大大提高消息中间件的可靠性。

确认机制本质上是通讯的握手机制(Handshaking)。如果没有该机制,消息在传输过程中丢失将不会被发现。高敏感的消息要求选取具备确认机制的消息中间件。当然如果没有接收到消息中间件确认完成的指令,应用程序需要决定如何处理。典型的做法有两个:

  • 多次重试。
  • 暂存到本地磁盘或其它持久化媒介。

客户端接口所支持语言

采用现存消息中间件就意味着避免重复造轮子。如果某个消息中间件未能提供对应语言的客户端接口,则意味着极大的成本和兼容性问题。

投递策略(Delivery policies)

投递策略指的是一个消息会被发送几次。主要包含三种策略:最多一次(At most Once )、最少一次(At least Once)、仅有一次(Exactly Once)。
在实际应用中,只考虑消息中间件的投递策略并不能保证业务的投递策略,因为接收者在确认收到消息和处理完消息并持久化之间存在一个时间窗口。例如,即使消息中间件保证仅有一次(Exactly Once),如果接收者先确认消息,在持久化之前宕机,则该消息并未被处理。从应用的角度,这就是最多一次(At most Once)。反之,接收者先处理消息并完成持久化,但在确认之前宕机,消息就要被再次发送,这就是最少一次(At least Once)。 如果消息投递策略非常重要,应用程序自身也需要仔细设计。

消费者优化

消费者是分布式队列编程中真正的数据处理方,数据处理方最常见的挑战包括:有序性、串行化(Serializability)、频次控制、完整性和一致性等。

挑战

有序性

在很多场景下,如何保证队列信息的有序处理是一个棘手的问题。如下图,假定分布式队列保证请求严格有序,请求ri2和ri1都是针对同一数据记录的不同状态,ri2的状态比ri1的状态新。T1、T2、T3和T4代表各个操作发生的时间,并且 T1 < T2 < T3 < T4(”<“代表早于)。
采用多消费者架构,这两条记录被两个消费者(Consumer1和Consumer2)处理后更新到数据库里面。Consumer1虽然先读取ri1但是却后写入数据库,这就导致,新的状态被老的状态覆盖,所以多消费者不保证数据的有序性。
有序性

串行化

很多场景下,串行化是数据处理的一个基本需求,这是保证数据完整性、可恢复性、事务原子性等的基础。为了在并行计算系统里实现串行化,一系列的相关理论和实践算法被提出。对于分布式队列编程架构,要在在多台消费者实现串行化非常复杂,无异于重复造轮子。

频次控制

有时候,消费者的消费频次需要被控制,可能的原因包括:

  • 费用问题。如果每次消费所引起的操作都需要收费,而同一个请求消息在队列中保存多份,不进行频次控制,就会导致无谓的浪费。
  • 性能问题。每次消费可能会引起对其他服务的调用,被调用服务希望对调用量有所控制,对同一个请求消息的多次访问就需要有所控制。

完整性和一致性

完整性和一致性是所有多线程和多进程的代码都面临的问题。在多线程或者多进程的系统中考虑完整性和一致性往往会大大地增加代码的复杂度和系统出错的概率。

单例服务优化

几乎所有串行化理论真正解决的问题只有一个:性能。 所以,在性能允许的前提下,对于消费者角色,建议采用单实例部署。通过单实例部署,有序性、串行化、完整性和一致性问题自动获得了解决。另外,单实例部署的消费者拥有全部所需信息,它可以在频次控制上采取很多优化策略。

天下没有免费的午餐。同样,单实例部署并非没有代价,它意味着系统可用性的降低,很多时候,这是无法接受的。解决可用性问题的最直接的思路就是冗余(Redundancy)。最常用的冗余方案是Master-slave架构,不过大部分的Master-slave架构都是Active/active模式,即主从服务器都提供服务。例如,数据库的Master-slave架构就是主从服务器都提供读服务,只有主服务器提供写服务。大部分基于负载均衡设计的Master-slave集群中,主服务器和从服务器同时提供相同的服务。这显然不满足单例服务优化需求。有序性和串行化需要Active/passive架构,即在某一时刻只有主实例提供服务,其他的从服务等待主实例失效。这是典型的领导人选举架构,即只有获得领导权的实例才能充当实际消费者,其他实例都在等待下一次选举。采用领导人选举的Active/passive架构可以大大缓解纯粹的单实例部署所带来的可用性问题。

令人遗憾的是,除非工程师们自己在消费者实例里面实现Paxos等算法,并在每次消息处理之前都执行领导人选举。否则,理论上讲,没有方法可以保障在同一个时刻只有一个领导者。而对每个消息都执行一次领导人选举,显然性能不可行。实际工作中,最容易出现的问题时机发生在领导人交接过程中,即前任领导人实例变成辅助实例,新部署实例开始承担领导人角色。为了平稳过渡,这两者之间需要有一定的通讯机制,但是,无论是网络分区(Network partition)还是原领导人服务崩溃都会使这种通讯机制变的不可能。

对于完整性和一致性要求很高的系统,我们需要在选举制度和交接制度这两块进行优化。

领导人选举架构

典型的领导人选举算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。为了避免重复造轮子,建议采用ZooKeeper的分布式锁来实现领导人选举。典型的ZooKeeper实现算法如下(摘自参考资料[4]):

Let ELECTION be a path of choice of the application. To volunteer to be a leader:

1.Create znode z with path “ELECTION/guid-n_” with both SEQUENCE and EPHEMERAL flags;
2.Let C be the children of “ELECTION”, and i be the sequence number of z;
3.Watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:

1.Let C be the new set of children of ELECTION;
2.If z is the smallest node in C, then execute leader procedure;
3.Otherwise, watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

领导人交接架构

领导人选举的整个过程发生在ZooKeeper集群中,各个消费者实例在这场选举中只充当被告知者角色(Learner)。领导人选举算法,只能保证最终只有一个Leader被选举出来,并不保障被告知者对Leader的理解是完全一致的。本质上,上文的架构里,选举的结果是作为令牌(Token)传递给消费者实例,消费者将自身的ID与令牌进行对比,如果相等,则开始执行消费操作。所以当发生领导人换届的情况,不同的Learner获知新Leader的时间并不同。例如,前任Leader如果因为网络问题与ZooKeeper集群断开,前任Leader只能在超时后才能判断自己是否不再承担Leader角色了,而新的Leader可能在这之前已经产生。另一方面,即使前任Leader和新Leader同时接收到新Leader选举结果,某些业务的完整性要求迫使前任Leader仍然完成当前未完成的工作。以上的讲解非常抽象,生活中却给了一些更加具体的例子。众所周知,美国总统候选人在选举结束后并不直接担任美国总统,从选举到最终承担总统角色需要一个过渡期。对于新当选Leader的候选人而言,过渡期间称之为加冕阶段(Inauguration)。对于即将卸任的Leader,过渡期称为交接阶段(HandOver)。所以一个基于领导人选举的消费者从加冕到卸任经历三个阶段:Inauguration、Execution、HandOver。在加冕阶段,新领导需要进行一些初始化操作。Execution阶段是真正的队列消息处理阶段。在交接阶段,前任领导需要进行一些清理操作。

类似的,为了解决领导人交接问题,所有的消费者从代码实现的角度都需要实现类似ILeaderCareer接口。这个接口包含三个方发inaugurate(),handOver()和execute()。某个部署实例(Learner)在得知自己承担领导人角色后,需要调用inaugurate()方法,进行加冕。主要的消费逻辑通过不停的执行execute()实现,当确认自己不再承担领导人之后,执行handOver()进行交接。

public interface ILeaderCareer {
    public void inaugurate();
    public void handOver();
    public boolean execute();
}

如果承担领导人角色的消费者,在执行execute()阶段得知自己将要下台,根据消息处理的原子性,该领导人可以决定是否提前终止操作。如果整个消息处理是一个原子性事务,直接终止该操作可以快速实现领导人换届。否则,前任领导必须完成当前消息处理后,才进入交接阶段。这意味着新的领导人,在inaugurate()阶段需要进行一定时间的等待。

排重优化

频次控制是一个经典问题。对于分布式队列编程架构,相同请求重复出现在队列的情况并不少见。如果相同请求在队列中重复太多,排重优化就显得很必要。分布式缓存更新是一个典型例子,所有请求都被发送到队列中用于缓存更新。如果请求符合典型的高斯分布,在一段时间内会出现大量重复的请求,而同时多线程更新同一请求缓存显然没有太大的意义。
排重优化是一个算法,其本质是基于状态机的编程,整个讲解通过模型、构思和实施三个步骤完成。

模型

进行排重优化的前提是大量重复的请求。在模型这一小节,我们首先阐述重复度模型、以及不同重复度所导致的消费模型,最后基于这两个模型去讲解排重状态机。

重复度模型

首先我们给出最小重复长度的概念。同一请求最小重复长度:同一请求在队列中的重复出现的最小间距。例如,请求ri第一次出现在位置3,第二次出现在10,最小重复长度等于7。
是否需要进行排重优化取决于队列中请求的重复度。由于不同请求之间并不存在重复的问题,不失一般性,这里的模型只考了单个请求的重复度,重复度分为三个类:无重复、稀疏重复、高重复。
无重复:在整个请求过程,没有任何一个请求出现一次以上。
稀疏重复:主要的请求最小重复长度大于消费队列长度。
高重复:大量请求最小重复长度小于消费队列长度。
对于不同的重复度,会有不同的消费模型。

无重复消费模型

在整个队列处理过程中,所有的请求都不相同,如下图:
无重复消费模型

稀疏重复消费模型

当同一请求最小重复长度大于消费者队列长度,如下图。假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1。虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景。
稀疏重复消费模型

高重复消费模型

如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理:
高重复消费模型
显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处。排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大。

排重状态机

排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理。所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理。如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态。如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义。所以,状态机的第二个目标是等待唯一性,即:同一时刻,同一个请求最多只有一个消费者处于等待状态。总上述,状态机的目标是:处理唯一性和等待唯一性。我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求。
由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机。基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline。各个状态之间转化过程如下图:
排重状态机

  1. 状态机创建时处于Init状态。
  2. 对Init状态进行Enqueue操作,即接收一个请求,开始处理(称为头部请求),状态机进入Process状态。
  3. 状态机处于Process状态,表明当前有消费者正在处理头部请求。此时,如果进行Dequeue操作,即头部请求处理完成,返回Init状态。如果进行Enqueue操作,即另一个消费者准备处理同一个请求,状态机进入Block状态(该请求称为尾部请求)。
  4. 状态机处于Block状态,表明头部请求正在处理,尾部请求处于阻塞状态。此时,进行Dequeue操作,即头部请求处理完成,返回Process状态,并且尾部请求变成头部请求,原尾部请求消费者结束阻塞状态,开始处理。进行Enqueue操作,表明一个新的消费者准备处理同一个请求,状态机进入Decline状态。
  5. 状态机进入Decline状态,根据等待唯一性目标,处理最新请求的消费者将被抛弃该消息,状态机自动转换回Block状态。

构思

状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理。这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法。对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题。

一致性问题

一致性在这里要求同一请求的不同消费者只会操作一个状态机。由于每个请求都产生一个状态机,系统将会包含大量的状态机。为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机。用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机。为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法。代码方案如下,key2Status用于存储所有的状态机。消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate。如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中。

private ConcurrentHashMap<T, TrafficAutomate> key2Status = new ConcurrentHashMap();
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null)
{
    trafficAutomate = new TrafficAutomate();
    TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
    if(oldAutomate != null)
    {
        trafficAutomate = oldAutomate;
    }
}

完整性问题

完整性要求保障状态机Init,Process,Block,Decline四种状态正确、状态之间的转换也正确。由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁。

请求缓存驱逐问题(Cache Eviction)

如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大。所以,某些状态机需要在恰当的时候被驱逐出内存。这里有两个思路:

  • 当状态机返回Init状态时,清除出队列。
  • 启动一个后台线程,定时扫描状态机队列,采用LRU等标准缓存清除机制。

标识问题

每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别。
对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识。

实施

排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的。排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题。

状态机实施(TrafficAutomate)

根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求。为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁。

enQueue操作

当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成。enQueue代码如下:

synchronized ActionEnum enQueue(long id)
{ 
    if(tail != INIT_QUEUE_ID)
    {
        return DECLINE;
    }

    if(head == INIT_QUEUE_ID)
    {
        head = id;
        return ACCEPT;
    }
    else
    {
        tail = id;
        return BLOCK;
    }
}
deQueue操作

对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效。deQueue代码如下:

synchronized boolean deQueue(long id)
{
        head = tail;
        tail = INIT_QUEUE_ID;
        return true;
}

状态机队列实施(QueueCoordinator)

接口定义

状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口。这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求。为了兼容不同类型的请求消息,我们采用了Java泛型编程。接口定义如下:

public interface QueueCoordinator<T> {

    public boolean enQueue(T key);

    public void deQueue(T key);

}
enQueue操作

enQueue操作过程如下:
首先,根据传入的请求key值,获取状态机, 如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中。
接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作。
如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息。如果状态机返回值为Block,则该线程保持等待状态。
异常处理。在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施)。为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限。超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求。
代码如下:

public boolean enQueue(T key) {
    _loggingStastic();

    TrafficAutomate trafficAutomate = key2Status.get(key);
    if(trafficAutomate == null)
    {
        trafficAutomate = new TrafficAutomate();
        TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
        if(oldAutomate != null)
        {
            trafficAutomate = oldAutomate;
        }
    }
    long threadId = Thread.currentThread().getId();

    ActionEnum action = trafficAutomate.enQueue(threadId);

    if(action == DECLINE)
    {
        return false;
    }
    else if (action == ACCEPT)
    {
        return true;
    }
    //Blocking status means some other thread are working on this key, so just wait till timeout
    long start = System.currentTimeMillis();
    long span = 0;
    do {
        _nonExceptionSleep(NAP_TIME_IN_MILL);

        if(trafficAutomate.isHead(threadId))
        {
            return true;
        }

        span = System.currentTimeMillis() - start;
    }while(span <= timeout);

    //remove head so that it won't block the queue for too long
    trafficAutomate.evictHeadByForce(threadId);

    return true;
}
deQueue操作

deQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作。
enQueue代码如下:

public void deQueue(T key) {
    TrafficAutomate trafficAutomate = key2Status.get(key);

    if(trafficAutomate == null)
    {
        logger.error("key {} doesn't exist ", key);
        return;
    }

    long threadId = Thread.currentThread().getId();

    trafficAutomate.deQueue(threadId);
}

源代码

完整源代码可以在QueueCoordinator获取。

参考资料

[1] Rabbit MQ, Highly Available Queues.
[2] IBM Knowledge Center, Introduction to message queuing.
[3] Wikipedia, Serializability.
[4] Hadoop, ZooKeeper Recipes and Solutions.
[5] Apache Kafka.
[6] Lamport L, Paxos Made Simple.

原文出自:https://tech.meituan.com/archives

分布式队列编程:模型、实战

美团点评阅读(2110)

介绍

作为一种基础的抽象数据结构,队列被广泛应用在各类编程中。大数据时代对跨进程、跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在。但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点:

  • 使用分布式队列的时候,没有意识到它是队列。
  • 有具体需求的时候,忘记了分布式队列的存在。

文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源、定义、结构以及其变化多样性。通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能够将具体需求关联到分布式队列编程模型,具备进行分布式队列架构的能力;另一方面,通过全方位的讲解,让读者能够快速识别工作中碰到的各种分布式队列编程模型。

文章的第二部分实战篇。根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用。这些例子的基础模型并非首次出现在互联网的文档中,但是所有的例子都是按照挑战、构思、架构三个步骤进行讲解的。这种讲解方式能给读者一个“从需求出发去构架分布式队列编程”的旅程。

分布式队列编程模型

模型篇从基础的需求出发,去思考何时以及如何使用分布式队列编程模型。建模环节非常重要,因为大部分中高级工程师面临的都是具体的需求,接到需求后的第一个步骤就是建模。通过本篇的讲解,希望读者能够建立起从需求到分布式队列编程模型之间的桥梁。

何时选择分布式队列

通讯是人们最基本的需求,同样也是计算机最基本的需求。对于工程师而言,在编程和技术选型的时候,更容易进入大脑的概念是RPC、RESTful、Ajax、Kafka。在这些具体的概念后面,最本质的东西是“通讯”。所以,大部分建模和架构都需要从“通讯”这个基本概念开始。当确定系统之间有通讯需求的时候,工程师们需要做很多的决策和平衡,这直接影响工程师们是否会选择分布式队列编程模型作为架构。从这个角度出发,影响建模的因素有四个:When、Who、Where、How。

When:同步VS异步

通讯的一个基本问题是:发出去的消息什么时候需要被接收到?这个问题引出了两个基础概念:“同步通讯”和“异步通讯”。根据理论抽象模型,同步通讯和异步通讯最本质的差别来自于时钟机制的有无。同步通讯的双方需要一个校准的时钟,异步通讯的双方不需要时钟。现实的情况是,没有完全校准的时钟,所以没有绝对的同步通讯。同样,绝对异步通讯意味着无法控制一个发出去的消息被接收到的时间点,无期限的等待一个消息显然毫无实际意义。所以,实际编程中所有的通讯既不是“同步通讯”也不是“异步通讯”;或者说,既是“同步通讯”也是“异步通讯”。特别是对于应用层的通讯,其底层架构可能既包含“同步机制”也包含“异步机制”。判断“同步”和“异步”消息的标准问题太深,而不适合继续展开。作者这里给一些启发式的建议:

  • 发出去的消息是否需要确认,如果不需要确认,更像是异步通讯,这种通讯有时候也称为单向通讯(One-Way Communication)。
  • 如果需要确认,可以根据需要确认的时间长短进行判断。时间长的更像是异步通讯,时间短的更像是同步通讯。当然时间长短的概念是纯粹的主观概念,不是客观标准。
  • 发出去的消息是否阻塞下一个指令的执行,如果阻塞,更像是同步,否则,更像是异步。

无论如何,工程师们不能生活在混沌之中,不做决定往往是最坏的决定。当分析一个通讯需求或者进行通讯构架的时候,工程师们被迫作出“同步”还是“异步”的决定。当决策的结论是“异步通讯”的时候,分布式队列编程模型就是一个备选项。

Who:发送者接收者解耦

在进行通讯需求分析的时候,需要回答的另外一个基本问题是:消息的发送方是否关心谁来接收消息,或者反过来,消息接收方是否关心谁来发送消息。如果工程师的结论是:消息的发送方和接收方不关心对方是谁、以及在哪里,分布式队列编程模型就是一个备选项。因为在这种场景下,分布式队列架构所带来的解耦能给系统架构带来这些好处:

  • 无论是发送方还是接收方,只需要跟消息中间件通讯,接口统一。统一意味着降低开发成本。
  • 在不影响性能的前提下,同一套消息中间件部署,可以被不同业务共享。共享意味着降低运维成本。
  • 发送方或者接收方单方面的部署拓扑的变化不影响对应的另一方。解藕意味着灵活和可扩展。

Where:消息暂存机制

在进行通讯发送方设计的时候,令工程师们苦恼的问题是:如果消息无法被迅速处理掉而产生堆积怎么办、能否被直接抛弃?如果根据需求分析,确认存在消息积存,并且消息不应该被抛弃,就应该考虑分布式队列编程模型构架,因为队列可以暂存消息。

How:如何传递

对通讯需求进行架构,一系列的基础挑战会迎面而来,这包括:

  • 可用性,如何保障通讯的高可用。
  • 可靠性,如何保证消息被可靠地传递。
  • 持久化,如何保证消息不会丢失。
  • 吞吐量和响应时间。
  • 跨平台兼容性。
    除非工程师对造轮子有足够的兴趣,并且有充足的时间,采用一个满足各项指标的分布式队列编程模型就是一个简单的选择。

分布式队列编程定义

很难给出分布式队列编程模型的精确定义,由于本文偏重于应用,作者并不打算完全参照某个标准的模型。总体而言:分布式队列编程模型包含三类角色:发送者(Sender)、分布式队列(Queue)、接收者(Receiver)。发送者和接收者分别指的是生产消息和接收消息的应用程序或服务。
需要重点明确的概念是分布式队列,它是提供以下功能的应用程序或服务:1. 接收“发送者”产生的消息实体;2. 传输、暂存该实体;3. 为“接收者”提供读取该消息实体的功能。特定的场景下,它当然可以是Kafka、RabbitMQ等消息中间件。但它的展现形式并不限于此,例如:

  • 队列可以是一张数据库的表,发送者将消息写入表,接收者从数据表里读消息。
  • 如果一个程序把数据写入Redis等内存Cache里面,另一个程序从Cache里面读取,缓存在这里就是一种分布式队列。
  • 流式编程里面的的数据流传输也是一种队列。
  • 典型的MVC(Model–view–controller)设计模式里面,如果Model的变化需要导致View的变化,也可以通过队列进行传输。这里的分布式队列可以是数据库,也可以是某台服务器上的一块内存。

抽象模型

最基础的分布式队列编程抽象模型是点对点模型,其他抽象构架模型居于改基本模型上各角色的数量和交互变化所导致的不同拓扑图。具体而言,不同数量的发送者、分布式队列以及接收者组合形成了不同的分布式队列编程模型。记住并理解典型的抽象模型结构对需求分析和建模而言至关重要,同时也会有助于学习和深入理解开源框架以及别人的代码。

点对点模型(Point-to-point)

基础模型中,只有一个发送者、一个接收者和一个分布式队列。如下图所示:
点对点模型

生产者消费者模型(Producer–consumer)

如果发送者和接收者都可以有多个部署实例,甚至不同的类型;但是共用同一个队列,这就变成了标准的生产者消费者模型。在该模型,三个角色一般称之为生产者(Producer)、分布式队列(Queue)、消费者(Consumer)。
生产者消费者模型

发布订阅模型(PubSub)

如果只有一类发送者,发送者将产生的消息实体按照不同的主题(Topic)分发到不同的逻辑队列。每种主题队列对应于一类接收者。这就变成了典型的发布订阅模型。在该模型,三个角色一般称之为发布者(Publisher),分布式队列(Queue),订阅者(Subscriber)。
发布订阅模型

MVC模型

如果发送者和接收者存在于同一个实体中,但是共享一个分布式队列。这就很像经典的MVC模型。

MVC模型

编程模型

为了让读者更好地理解分布式队列编程模式概念,这里将其与一些容易混淆的概念做一些对比 。

分布式队列模型编程和异步编程

分布式队列编程模型的通讯机制一般是采用异步机制,但是它并不等同于异步编程。
首先,并非所有的异步编程都需要引入队列的概念,例如:大部分的操作系统异步I/O操作都是通过硬件中断( Hardware Interrupts)来实现的。
其次,异步编程并不一定需要跨进程,所以其应用场景并不一定是分布式环境。
最后,分布式队列编程模型强调发送者、接收者和分布式队列这三个角色共同组成的架构。这三种角色与异步编程没有太多关联。

分布式队列模式编程和流式编程

随着Spark Streaming,Apache Storm等流式框架的广泛应用,流式编程成了当前非常流行的编程模式。但是本文所阐述的分布式队列编程模型和流式编程并非同一概念。
首先,本文的队列编程模式不依赖于任何框架,而流式编程是在具体的流式框架内的编程。
其次,分布式队列编程模型是一个需求解决方案,关注如何根据实际需求进行分布式队列编程建模。流式框架里的数据流一般都通过队列传递,不过,流式编程的关注点比较聚焦,它关注如何从流式框架里获取消息流,进行map、reduce、 join等转型(Transformation)操作、生成新的数据流,最终进行汇总、统计。


分布式队列编程实战篇

这里所有的项目都是作者在新美大工作的真实案例。实战篇的关注点是训练建模思路,所以这些例子都按照挑战、构思、架构三个步骤进行讲解。受限于保密性要求,有些细节并未给出,但这些细节并不影响讲解的完整性。另一方面,特别具体的需求容易让人费解,为了使讲解更加顺畅,作者也会采用一些更通俗易懂的例子。通过本篇的讲解,希望和读者一起去实践“如何从需求出发去构架分布式队列编程模型”。

需要声明的是,这里的解决方案并不是所处场景的最优方案。但是,任何一个稍微复杂的问题,都没有最优解决方案,更谈不上唯一的解决方案。实际上,工程师每天所追寻的只是在满足一定约束条件下的可行方案。当然不同的约束会导致不同的方案,约束的松弛度决定了工程师的可选方案的宽广度。

信息采集处理

信息采集处理应用广泛,例如:广告计费、用户行为收集等。作者碰到的具体项目是为广告系统设计一套高可用的采集计费系统。
典型的广告CPC、CPM计费原理是:收集用户在客户端或者网页上的点击和浏览行为,按照点击和浏览进行计费。计费业务有如下典型特征:

  • 采集者和处理者解耦,采集发生在客户端,而计费发生在服务端。
  • 计费与钱息息相关。
  • 重复计费意味着灾难。
  • 计费是动态实时行为,需要接受预算约束,如果消耗超过预算,则广告投放需要停止。
  • 用户的浏览和点击量非常大。

挑战

计费业务的典型特征给我们带来了如下挑战:

  • 高吞吐量--广告的浏览和点击量非常巨大,我们需要设计一个高吞吐量的采集架构。
  • 高可用性--计费信息的丢失意味着直接的金钱损失。任何处理服务器的崩溃不应该导致系统不可用。
  • 高一致性要求--计费是一个实时动态处理过程,但要受到预算的约束。收集到的浏览和点击行为如果不能快速处理,可能会导致预算花超,或者点击率预估不准确。所以采集到的信息应该在最短的时间内传输到计费中心进行计费。
  • 完整性约束--这包括反作弊规则,单个用户行为不能重复计费等。这要求计费是一个集中行为而非分布式行为。
  • 持久化要求--计费信息需要持久化,避免因为机器崩溃而导致收集到的数据产生丢失。

构思

采集的高可用性意味着我们需要多台服务器同时采集,为了避免单IDC故障,采集服务器需要部署在多IDC里面。
实现一个高可用、高吞吐量、高一致性的信息传递系统显然是一个挑战,为了控制项目开发成本,采用开源的消息中间件进行消息传输就成了必然选择。
完整性约束要求集中进行计费,所以计费系统发生在核心IDC。
计费服务并不关心采集点在哪里,采集服务也并不关心谁进行计费。
根据以上构思,我们认为采集计费符合典型的“生产者消费者模型”。

架构

采集计费系统架构图如下:

  • 用户点击浏览收集服务(Click/View Collector)作为生产者部署在多个机房里,以提高收集服务可用性。
  • 每个机房里采集到的数据通过消息队列中间件发送到核心机房IDC_Master。
  • Billing服务作为消费者部署在核心机房集中计费。
    计费采集架构

采用此架构,我们可以在如下方面做进一步优化:

  • 提高可扩展性,如果一个Billing部署实例在性能上无法满足要求,可以对采集的数据进行主题分区(Topic Partition)计费,即采用发布订阅模式以提高可扩展性(Scalability)。
  • 全局排重和反作弊。采用集中计费架构解决了点击浏览排重的问题,另一方面,这也给反作弊提供了全局信息。
  • 提高计费系统的可用性。采用下文单例服务优化策略,在保障计费系统集中性的同时,提高计费系统可用性。

分布式缓存更新(Distributed Cache Replacement)

缓存是一个非常宽泛的概念,几乎存在于系统各个层级。典型的缓存访问流程如下:

  • 接收到请求后,先读取缓存,如果命中则返回结果。
  • 如果缓存不命中,读取DB或其它持久层服务,更新缓存并返回结果。
    缓存更新
    对于已经存入缓存的数据,其更新时机和更新频率是一个经典问题,即缓存更新机制(Cache Replacement Algorithms )。典型的缓存更新机制包括:近期最少使用算法(LRU)、最不经常使用算法(LFU)。这两种缓存更新机制的典型实现是:启动一个后台进程,定期清理最近没有使用的,或者在一段时间内最少使用的数据。由于存在缓存驱逐机制,当一个请求在没有命中缓存时,业务层需要从持久层中获取信息并更新缓存,提高一致性。

挑战

分布式缓存给缓存更新机制带来了新的问题:

  • 数据一致性低。分布式缓存中键值数量巨大,从而导致LRU或者LFU算法更新周期很长。在分布式缓存中,拿LRU算法举例,其典型做法是为每个Key值设置一个生存时间(TTL),生存时间到期后将该键值从缓存中驱逐除去。考虑到分布式缓存中庞大的键值数量,生存时间往往会设置的比较长,这就导致缓存和持久层数据不一致时间很长。如果生存时间设置过短,大量请求无法命中缓存被迫读取持久层,系统响应时间会急剧恶化。
  • 新数据不可用。在很多场景下,由于分布式缓存和持久层的访问性能相差太大,在缓存不命中的情况下,一些应用层服务不会尝试读取持久层,而直接返回空结果。漫长的缓存更新周期意味着新数据的可用性就被牺牲了。从统计的角度来讲,新键值需要等待半个更新周期才会可用。

构思

根据上面的分析,分布式缓存需要解决的问题是:在保证读取性能的前提下,尽可能地提高老数据的一致性和新数据的可用性。如果仍然假定最近被访问的键值最有可能被再次访问(这是LRU或者LFU成立的前提),键值每次被访问后触发一次异步更新就是提高可用性和一致性最早的时机。无论是高性能要求还是业务解耦都要求缓存读取和缓存更新分开,所以我们应该构建一个单独的集中的缓存更新服务。集中进行缓存更新的另外一个好处来自于频率控制。由于在一段时间内,很多类型访问键值的数量满足高斯分布,短时间内重复对同一个键值进行更新Cache并不会带来明显的好处,甚至造成缓存性能的下降。通过控制同一键值的更新频率可以大大缓解该问题,同时有利于提高整体数据的一致性,参见“排重优化”。

综上所述,业务访问方需要把请求键值快速传输给缓存更新方,它们之间不关心对方的业务。要快速、高性能地实现大量请求键值消息的传输,高性能分布式消息中间件就是一个可选项。这三方一起组成了一个典型的分布式队列编程模型。

架构

如下图,所有的业务请求方作为生产者,在返回业务代码处理之前将请求键值写入高性能队列。Cache Updater作为消费者从队列中读取请求键值,将持久层中数据更新到缓存中。
分布式缓存更新
采用此架构,我们可以在如下方面做进一步优化:

  • 提高可扩展性,如果一个Cache Updater在性能上无法满足要求,可以对键值进行主题分区(Topic Partition)进行并行缓存更新,即采用发布订阅模式以提高可扩展性(Scalability)。
  • 更新频率控制。缓存更新都集中处理,对于发布订阅模式,同一类主题(Topic)的键值集中处理。Cache Updater可以控制对同一键值的在短期内的更新频率(参见下文排重优化)。

后台任务处理

典型的后台任务处理应用包括工单处理、火车票预订系统、机票选座等。我们所面对的问题是为运营人员创建工单。一次可以为多个运营人员创建多个工单。这个应用场景和火车票购买非常类似。工单相对来说更加抽象,所以,下文会结合火车票购买和运营人员工单分配这两种场景同时讲解。典型的工单创建要经历两个阶段:数据筛选阶段、工单创建阶段。例如,在火车票预订场景,数据筛选阶段用户选择特定时间、特定类型的火车,而在工单创建阶段,用户下单购买火车票。

挑战

工单创建往往会面临如下挑战:

  • 数据一致性问题。以火车票预订为例,用户筛选火车票和最终购买之间往往有一定的时延,意味着两个操作之间数据是不一致的。在筛选阶段,工程师们需决定是否进行车票锁定,如果不锁定,则无法保证出票成功。反之,如果在筛选地时候锁定车票,则会大大降低系统效率和出票吞吐量。
  • 约束问题。工单创建需要满足很多约束,主要包含两种类型:动态约束,与操作者的操作行为有关,例如购买几张火车票的决定往往发生在筛选最后阶段。隐性约束,这种约束很难通过界面进行展示,例如一个用户购买了5张火车票,这些票应该是在同一个车厢的临近位置。
  • 优化问题。工单创建往往是约束下的优化,这是典型的统筹优化问题,而统筹优化往往需要比较长的时间。
  • 响应时间问题。对于多任务工单,一个请求意味着多个任务产生。这些任务的创建往往需要遵循事务性原则,即All or Nothing。在数据层面,这意味着工单之间需要满足串行化需求(Serializability)。大数据量的串行化往往意味着锁冲突延迟甚至失败。无论是延迟机制所导致的长时延,还是高创建失败率,都会大大伤害用户体验。

构思

如果将用户筛选的最终规则做为消息存储下来,并发送给工单创建系统。此时,工单创建系统将具备创建工单所需的全局信息,具备在满足各种约束的条件下进行统筹优化的能力。如果工单创建阶段采用单实例部署,就可以避免数据锁定问题,同时也意味着没有锁冲突,所以也不会有死锁或任务延迟问题。
居于以上思路,在多工单处理系统的模型中,筛选阶段的规则创建系统将充当生产者角色,工单创建系统将充当消费者角色,筛选规则将作为消息在两者之间进行传递。这就是典型的分布式队列编程架构。根据工单创建量的不同,可以采用数据库或开源的分布式消息中间件作为分布式队列。

架构

该架构流程如下图:

  • 用户首选进行规则创建,这个过程主要是一些搜索筛选操作;
  • 用户点击工单创建,TicketRule Generator将把所有的筛选性组装成规则消息并发送到队列里面去;
  • Ticket Generator作为一个消费者,实时从队列中读取工单创建请求,开始真正创建工单。
    工单创建
    采用该架构,我们在数据锁定、运筹优化、原子性问题都能得到比较好成果:
  • 数据锁定推迟到工单创建阶段,可以减少数据锁定范围,最大程度的降低工单创建对其他在线操作的影响范围。
  • 如果需要进行统筹优化,可以将Ticket Generator以单例模式进行部署(参见单例服务优化)。这样,Ticket Generator可以读取一段时间内的工单请求,进行全局优化。例如,在我们的项目中,在某种条件下,运营人员需要满足分级公平原则,即相同级别的运营人员的工单数量应该接近,不同级别的运营人员工单数量应该有所区分。如果不集中进行统筹优化,实现这种优化规则将会很困难。
  • 保障了约束完整性。例如,在我们的场景里面,每个运营人员每天能够处理的工单是有数量限制的,如果采用并行处理的方式,这种完整性约束将会很难实施。

预告

下周我们会推出优化篇,重点阐述在工程师在运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议,欢迎关注!

参考资料

[1] RabbitMQ, Highly Available Queues.
[2] IBM Knowledge Center, Introduction to message queuing.
[3] Wikipedia, Serializability.
[4] Hadoop, ZooKeeper Recipes and Solutions.
[5] Apache Kafka.
[6] Lamport L, Paxos Made Simple.

原文出自:https://tech.meituan.com/archives

分布式系统互斥性与幂等性问题的分析与解决

美团点评阅读(1954)

前言

随着互联网信息技术的飞速发展,数据量不断增大,业务逻辑也日趋复杂,对系统的高并发访问、海量数据处理的场景也越来越多。如何用较低成本实现系统的高可用、易伸缩、可扩展等目标就显得越发重要。为了解决这一系列问题,系统架构也在不断演进。传统的集中式系统已经逐渐无法满足要求,分布式系统被使用在更多的场景中。

分布式系统由独立的服务器通过网络松散耦合组成。在这个系统中每个服务器都是一台独立的主机,服务器之间通过内部网络连接。分布式系统有以下几个特点:

  • 可扩展性:可通过横向水平扩展提高系统的性能和吞吐量。
  • 高可靠性:高容错,即使系统中一台或几台故障,系统仍可提供服务。
  • 高并发性:各机器并行独立处理和计算。
  • 廉价高效:多台小型机而非单台高性能机。

然而,在分布式系统中,其环境的复杂度、网络的不确定性会造成诸如时钟不一致、“拜占庭将军问题”(Byzantine failure)等。存在于集中式系统中的机器宕机、消息丢失等问题也会在分布式环境中变得更加复杂。

基于分布式系统的这些特征,有两种问题逐渐成为了分布式环境中需要重点关注和解决的典型问题:

  • 互斥性问题。
  • 幂等性问题。

今天我们就针对这两个问题来进行分析。

互斥性问题

先看两个常见的例子:

例1:某服务记录关键数据X,当前值为100。A请求需要将X增加200;同时,B请求需要将X减100。
在理想的情况下,A先读取到X=100,然后X增加200,最后写入X=300。B请求接着从读取X=300,减少100,最后写入X=200。
然而在真实情况下,如果不做任何处理,则可能会出现:A和B同时读取到X=100;A写入之前B读取到X;B比A先写入等等情况。

例2:某服务提供一组任务,A请求随机从任务组中获取一个任务;B请求随机从任务组中获取一个任务。
在理想的情况下,A从任务组中挑选一个任务,任务组删除该任务,B从剩下的的任务中再挑一个,任务组删除该任务。
同样的,在真实情况下,如果不做任何处理,可能会出现A和B挑中了同一个任务的情况。

以上的两个例子,都存在操作互斥性的问题。互斥性问题用通俗的话来讲,就是对共享资源的抢占问题。如果不同的请求对同一个或者同一组资源读取并修改时,无法保证按序执行,无法保证一个操作的原子性,那么就很有可能会出现预期外的情况。因此操作的互斥性问题,也可以理解为一个需要保证时序性、原子性的问题。

在传统的基于数据库的架构中,对于数据的抢占问题往往是通过数据库事务(ACID)来保证的。在分布式环境中,出于对性能以及一致性敏感度的要求,使得分布式锁成为了一种比较常见而高效的解决方案。

事实上,操作互斥性问题也并非分布式环境所独有,在传统的多线程、多进程情况下已经有了很好的解决方案。因此在研究分布式锁之前,我们先来分析下这两种情况的解决方案,以期能够对分布式锁的解决方案提供一些实现思路。

多线程环境解决方案及原理

解决方案

《Thinking in Java》书中写到:

基本上所有的并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。

在多线程环境中,线程之间因为公用一些存储空间,冲突问题时有发生。解决冲突问题最普遍的方式就是用互斥锁把该资源或对该资源的操作保护起来。

Java JDK中提供了两种互斥锁Lock和synchronized。不同的线程之间对同一资源进行抢占,该资源通常表现为某个类的普通成员变量。因此,利用ReentrantLock或者synchronized将共享的变量及其操作锁住,即可基本解决资源抢占的问题。

下面来简单聊一聊两者的实现原理。

原理

ReentrantLock

ReentrantLock主要利用CAS+CLH队列来实现。它支持公平锁和非公平锁,两者的实现类似。

  • CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。
  • CLH队列:带头结点的双向非循环链表(如下图所示):

CLH队列

ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入CLH队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:

  • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
  • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

下面分析下两个片段:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在尝试获取锁的时候,会先调用上面的方法。如果状态为0,则表明此时无人占有锁。此时尝试进行set,一旦成功,则成功占有锁。如果状态不为0,再判断是否是当前线程获取到锁。如果是的话,将状态+1,因为此时就是当前线程,所以不用CAS。这也就是可重入锁的实现原理。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

该方法是在尝试获取锁失败加入CHL队尾之后,如果发现前序节点是head,则CAS再尝试获取一次。否则,则会根据前序节点的状态判断是否需要阻塞。如果需要阻塞,则调用LockSupport的park方法阻塞该线程。

synchronized

在Java语言中存在两种内建的synchronized语法:synchronized语句、synchronized方法。

  • synchronized语句:当源代码被编译成字节码的时候,会在同步块的入口位置和退出位置分别插入monitorenter和monitorexit字节码指令;
  • synchronized方法:在Class文件的方法表中将该方法的access_flags字段中的synchronized标志位置1。这个在specification中没有明确说明。

在Java虚拟机的specification中,有关于monitorenter和monitorexit字节码指令的详细描述:http://docs.oracle.com/Javase/specs/jvms/se7/html/jvms-6.html#jvms-6.5.monitorenter

monitorenter

The objectref must be of type reference.
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:

  • If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
  • If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
  • If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

每个对象都有一个锁,也就是监视器(monitor)。当monitor被占有时就表示它被锁定。线程执行monitorenter指令时尝试获取对象所对应的monitor的所有权,过程如下:

  • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者;
  • 如果线程已经拥有了该monitor,只是重新进入,则进入monitor的进入数加1;
  • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
monitorexit

The objectref must be of type reference.
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

执行monitorexit的线程必须是相应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个monitor的所有权。

在JDK1.6及其之前的版本中monitorenter和monitorexit字节码依赖于底层的操作系统的Mutex Lock来实现的,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。然而在现实中的大部分情况下,同步方法是运行在单线程环境(无锁竞争环境)。如果每次都调用Mutex Lock将严重的影响程序的性能。因此在JDK 1.6之后的版本中对锁的实现做了大量的优化,这些优化在很大程度上减少或避免了Mutex Lock的使用。

多进程的解决方案

在多道程序系统中存在许多进程,它们共享各种资源,然而有很多资源一次只能供一个进程使用,这便是临界资源。多进程中的临界资源大致上可以分为两类,一类是物理上的真实资源,如打印机;一类是硬盘或内存中的共享数据,如共享内存等。而进程内互斥访问临界资源的代码被称为临界区。

针对临界资源的互斥访问,JVM层面的锁就已经失去效力了。在多进程的情况下,主要还是利用操作系统层面的进程间通信原理来解决临界资源的抢占问题。比较常见的一种方法便是使用信号量(Semaphores)。

信号量在POSIX标准下有两种,分别为有名信号量和无名信号量。无名信号量通常保存在共享内存中,而有名信号量是与一个特定的文件名称相关联。信号量是一个整数变量,有计数信号量和二值信号量两种。对信号量的操作,主要是P操作(wait)和V操作(signal)。

  • P操作:先检查信号量的大小,若值大于零,则将信号量减1,同时进程获得共享资源的访问权限,继续执行;若小于或者等于零,则该进程被阻塞后,进入等待队列。
  • V操作:该操作将信号量的值加1,如果有进程阻塞着等待该信号量,那么其中一个进程将被唤醒。

举个例子,设信号量为1,当一个进程A在进入临界区之前,先进行P操作。发现值大于零,那么就将信号量减为0,进入临界区执行。此时,若另一个进程B也要进去临界区,进行P操作,发现信号量等于0,则会被阻塞。当进程A退出临界区时,会进行V操作,将信号量的值加1,并唤醒阻塞的进程B。此时B就可以进入临界区了。

这种方式,其实和多线程环境下的加解锁非常类似。因此用信号量处理临界资源抢占,也可以简单地理解为对临界区进行加锁。

通过上面的一些了解,我们可以概括出解决互斥性问题,即资源抢占的基本方式为:

对共享资源的操作前后(进入退出临界区)加解锁,保证不同线程或进程可以互斥有序的操作资源。

加解锁方式,有显式的加解锁,如ReentrantLock或信号量;也有隐式的加解锁,如synchronized。那么在分布式环境中,为了保证不同JVM不同主机间不会出现资源抢占,那么同样只要对临界区加解锁就可以了。

然而在多线程和多进程中,锁已经有比较完善的实现,直接使用即可。但是在分布式环境下,就需要我们自己来实现分布式锁。

分布式环境下的解决方案——分布式锁

首先,我们来看看分布式锁的基本条件。

分布式锁条件

基本条件

再回顾下多线程和多进程环境下的锁,可以发现锁的实现有很多共通之处,它们都需要满足一些最基本的条件:

  1. 需要有存储锁的空间,并且锁的空间是可以访问到的。
  2. 锁需要被唯一标识。
  3. 锁要有至少两种状态。

仔细分析这三个条件:

  • 存储空间

锁是一个抽象的概念,锁的实现,需要依存于一个可以存储锁的空间。在多线程中是内存,在多进程中是内存或者磁盘。更重要的是,这个空间是可以被访问到的。多线程中,不同的线程都可以访问到堆中的成员变量;在多进程中,不同的进程可以访问到共享内存中的数据或者存储在磁盘中的文件。但是在分布式环境中,不同的主机很难访问对方的内存或磁盘。这就需要一个都能访问到的外部空间来作为存储空间。

最普遍的外部存储空间就是数据库了,事实上也确实有基于数据库做分布式锁(行锁、version乐观锁),如quartz集群架构中就有所使用。除此以外,还有各式缓存如Redis、Tair、Memcached、Mongodb,当然还有专门的分布式协调服务Zookeeper,甚至是另一台主机。只要可以存储数据、锁在其中可以被多主机访问到,那就可以作为分布式锁的存储空间。

  • 唯一标识

不同的共享资源,必然需要用不同的锁进行保护,因此相应的锁必须有唯一的标识。在多线程环境中,锁可以是一个对象,那么对这个对象的引用便是这个唯一标识。多进程环境中,信号量在共享内存中也是由引用来作为唯一的标识。但是如果不在内存中,失去了对锁的引用,如何唯一标识它呢?上文提到的有名信号量,便是用硬盘中的文件名作为唯一标识。因此,在分布式环境中,只要给这个锁设定一个名称,并且保证这个名称是全局唯一的,那么就可以作为唯一标识。

  • 至少两种状态

为了给临界区加锁和解锁,需要存储两种不同的状态。如ReentrantLock中的status,0表示没有线程竞争,大于0表示有线程竞争;信号量大于0表示可以进入临界区,小于等于0则表示需要被阻塞。因此只要在分布式环境中,锁的状态有两种或以上:如有锁、没锁;存在、不存在等等,均可以实现。

有了这三个条件,基本就可以实现一个简单的分布式锁了。下面以数据库为例,实现一个简单的分布式锁:
数据库表,字段为锁的ID(唯一标识),锁的状态(0表示没有被锁,1表示被锁)。
伪代码为:

lock = mysql.get(id);
while(lock.status == 1) {
    sleep(100);
}
mysql.update(lock.status = 1);
doSomething();
mysql.update(lock.status = 0);

问题

以上的方式即可以实现一个粗糙的分布式锁,但是这样的实现,有没有什么问题呢?

  • 问题1:锁状态判断原子性无法保证
    从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。

  • 问题2:网络断开或主机宕机,锁状态无法清除
    假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。

  • 问题3:无法保证释放的是自己上锁的那把锁
    在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。

进阶条件

如果分布式锁的实现,还能再解决上面的三个问题,那么就可以算是一个相对完整的分布式锁了。然而,在实际的系统环境中,还会对分布式锁有更高级的要求。

  1. 可重入:线程中的可重入,指的是外层函数获得锁之后,内层也可以获得锁,ReentrantLock和synchronized都是可重入锁;衍生到分布式环境中,一般仍然指的是线程的可重入,在绝大多数分布式环境中,都要求分布式锁是可重入的。
  2. 惊群效应(Herd Effect):在分布式锁中,惊群效应指的是,在有多个请求等待获取锁的时候,一旦占有锁的线程释放之后,如果所有等待的方都同时被唤醒,尝试抢占锁。但是这样的情况会造成比较大的开销,那么在实现分布式锁的时候,应该尽量避免惊群效应的产生。
  3. 公平锁和非公平锁:不同的需求,可能需要不同的分布式锁。非公平锁普遍比公平锁开销小。但是业务需求如果必须要锁的竞争者按顺序获得锁,那么就需要实现公平锁。
  4. 阻塞锁和自旋锁:针对不同的使用场景,阻塞锁和自旋锁的效率也会有所不同。阻塞锁会有上下文切换,如果并发量比较高且临界区的操作耗时比较短,那么造成的性能开销就比较大了。但是如果临界区操作耗时比较长,一直保持自旋,也会对CPU造成更大的负荷。

保留以上所有问题和条件,我们接下来看一些比较典型的实现方案。

典型实现

ZooKeeper的实现

ZooKeeper(以下简称“ZK”)中有一种节点叫做顺序节点,假如我们在/lock/目录下创建3个节点,ZK集群会按照发起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。

ZK中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZK集群断开连接,则该节点自动被删除。EPHEMERAL_SEQUENTIAL为临时顺序节点。

根据ZK中节点是否存在,可以作为分布式锁的锁状态,以此来实现一个分布式锁,下面是分布式锁的基本逻辑:

  1. 客户端调用create()方法创建名为“/dlm-locks/lockname/lock-”的临时顺序节点。
  2. 客户端调用getChildren(“lockname”)方法来获取所有已经创建的子节点。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
  4. 如果创建的节点不是所有节点中需要最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可,不过也仍需要考虑删除节点失败等异常情况。

开源的基于ZK的Menagerie的源码就是一个典型的例子:https://github.com/sfines/menagerie

Menagerie中的lock首先实现了可重入锁,利用ThreadLocal存储进入的次数,每次加锁次数加1,每次解锁次数减1。如果判断出是当前线程持有锁,就不用走获取锁的流程。

通过tryAcquireDistributed方法尝试获取锁,循环判断前序节点是否存在,如果存在则监视该节点并且返回获取失败。如果前序节点不存在,则再判断更前一个节点。如果判断出自己是第一个节点,则返回获取成功。

为了在别的线程占有锁的时候阻塞,代码中使用JUC的condition来完成。如果获取尝试锁失败,则进入等待且放弃localLock,等待前序节点唤醒。而localLock是一个本地的公平锁,使得condition可以公平的进行唤醒,配合循环判断前序节点,实现了一个公平锁。

这种实现方式非常类似于ReentrantLock的CHL队列,而且zk的临时节点可以直接避免网络断开或主机宕机,锁状态无法清除的问题,顺序节点可以避免惊群效应。这些特性都使得利用ZK实现分布式锁成为了最普遍的方案之一。

Redis的实现

Redis的分布式缓存特性使其成为了分布式锁的一种基础实现。通过Redis中是否存在某个锁ID,则可以判断是否上锁。为了保证判断锁是否存在的原子性,保证只有一个线程获取同一把锁,Redis有SETNX(即SET if Not
eXists)和GETSET(先写新值,返回旧值,原子性操作,可以用于分辨是不是首次操作)操作。

为了防止主机宕机或网络断开之后的死锁,Redis没有ZK那种天然的实现方式,只能依赖设置超时时间来规避。

以下是一种比较普遍但不太完善的Redis分布式锁的实现步骤(与下图一一对应):

  1. 线程A发送SETNX lock.orderid 尝试获得锁,如果锁不存在,则set并获得锁。
  2. 如果锁存在,则再判断锁的值(时间戳)是否大于当前时间,如果没有超时,则等待一下再重试。
  3. 如果已经超时了,在用GETSET lock.{orderid} 来尝试获取锁,如果这时候拿到的时间戳仍旧超时,则说明已经获得锁了。
  4. 如果在此之前,另一个线程C快一步执行了上面的操作,那么A拿到的时间戳是个未超时的值,这时A没有如期获得锁,需要再次等待或重试。

redis分布式锁实现

该实现还有一个需要考虑的问题是全局时钟问题,由于生产环境主机时钟不能保证完全同步,对时间戳的判断也可能会产生误差。

以上是Redis的一种常见的实现方式,除此以外还可以用SETNX+EXPIRE来实现。Redisson是一个官方推荐的Redis客户端并且实现了很多分布式的功能。它的分布式锁就提供了一种更完善的解决方案,源码:https://github.com/mrniko/redisson

Tair的实现

Tair和Redis的实现类似,Tair客户端封装了一个expireLock的方法:通过锁状态和过期时间戳来共同判断锁是否存在,只有锁已经存在且没有过期的状态才判定为有锁状态。在有锁状态下,不能加锁,能通过大于或等于过期时间的时间戳进行解锁。

采用这样的方式,可以不用在Value中存储时间戳,并且保证了判断是否有锁的原子性。更值得注意的是,由于超时时间是由Tair判断,所以避免了不同主机时钟不一致的情况。

以上的几种分布式锁实现方式,都是比较常见且有些已经在生产环境中应用。随着应用环境越来越复杂,这些实现可能仍然会遇到一些挑战。

  • 强依赖于外部组件:分布式锁的实现都需要依赖于外部数据存储如ZK、Redis等等,因此一旦这些外部组件出现故障,那么分布式锁就不可用了。
  • 无法完全满足需求:不同分布式锁的实现,都有相应的特点,对于一些需求并不能很好的满足,如实现公平锁、给等待锁加超时时间等等。

基于以上问题,结合多种实现方式,我们开发了Cerberus(得名自希腊神话里守卫地狱的猛犬),致力于提供灵活可靠的分布式锁。

Cerberus分布式锁

Cerberus有以下几个特点。

特点一:一套接口多种引擎

Cerberus分布式锁使用了多种引擎实现方式(Tair、ZK、未来支持Redis),支持使用方自主选择所需的一种或多种引擎。这样可以结合引擎特点,选择符合实际业务需求和系统架构的方式。

Cerberus分布式锁将不同引擎的接口抽象为一套,屏蔽了不同引擎的实现细节。使得使用方可以专注于业务逻辑,也可以任意选择并切换引擎而不必更改任何的业务代码。

如果使用方选择了一种以上的引擎,那么以配置顺序来区分主副引擎。以下是使用主引擎的推荐:

功能需求TairZK
并发量高
响应时间敏感
临界区执行时间长
公平锁
非公平锁
读写锁

特点二:使用灵活、学习成本低

下面是Cerberus的lock方法,这些方法和JUC的ReentrantLock的方式保持一致,使用非常灵活且不需要额外的学习时间。

  • void lock();
    获取锁,如果锁被占用,将禁用当前线程,并且在获得锁之前,该线程将一直处于阻塞状态。
  • boolean tryLock();
    仅在调用时锁为空闲状态才获取该锁。
    如果锁可用,则获取锁,并立即返回值 true。如果锁不可用,则此方法将立即返回值 false。
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
    如果在给定时间内锁可用,则获取锁,并立即返回值 true。如果在给定时间内锁一直不可用,则此方法将立即返回值false。
  • void lockInterruptibly() throws InterruptedException;
    获取锁,如果锁被占用,则一直等待直到线程被中断或者获取到锁。
  • void unlock();
    释放当前持有的锁。

特点三:支持一键降级

Cerberus提供了实时切换引擎的接口:

  • String switchEngine()
    转换分布式锁引擎,按配置的引擎的顺序循环转换。
    返回值:返回当前的engine名字,如:”zk”。
  • String switchEngine(String engineName)
    转换分布式锁引擎,切换为指定的引擎。
    参数:engineName – 引擎的名字,同配置bean的名字,”zk”/”tair”。
    返回值:返回当前的engine名字,如:”zk”。

当使用方选择了两种引擎,平时分布式锁会工作在主引擎上。一旦所依赖的主引擎出现故障,那么使用方可以通过自动或者手动方式调用该切换引擎接口,平滑的将分布式锁切换到另一个引擎上以将风险降到最低。自动切换方式可以利用Hystrix实现。手动切换推荐的一个方案则是使用美团点评基于Zookeeper的基础组件MCC,通过监听MCC配置项更改,来达到手动将分布式系统所有主机同步切换引擎的目的。需要注意的是,切换引擎目前并不会迁移原引擎已有的锁。这样做的目的是出于必要性、系统复杂度和可靠性的综合考虑。在实际情况下,引擎故障到切换引擎,尤其是手动切换引擎的时间,要远大于分布式锁的存活时间。作为较轻量级的Cerberus来说,迁移锁会带来不必要的开销以及较高的系统复杂度。鉴于此,如果想要保证在引擎故障后的绝对可靠,那么则需要结合其他方案来进行处理。

除此以外,Cerberus还提供了内置公用集群,免去搭建和配置集群的烦恼。Cerberus也有一套完善的应用授权机制,以此防止业务方未经评估使用,对集群造成影响。

目前,Cerberus分布式锁已经持续迭代了8个版本,先后在美团点评多个项目中稳定运行。

幂等性问题

所谓幂等,简单地说,就是对接口的多次调用所产生的结果和调用一次是一致的。扩展一下,这里的接口,可以理解为对外发布的HTTP接口或者Thrift接口,也可以是接收消息的内部接口,甚至是一个内部方法或操作。

那么我们为什么需要接口具有幂等性呢?设想一下以下情形:

  • 在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题。
  • 在接收消息的时候,消息推送重复。如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大。

在分布式环境中,网络环境更加复杂,因前端操作抖动、网络故障、消息重复、响应速度慢等原因,对接口的重复调用概率会比集中式环境下更大,尤其是重复消息在分布式环境中很难避免。Tyler Treat也在《You Cannot Have Exactly-Once Delivery》一文中提到:

Within the context of a distributed system, you cannot have exactly-once message delivery.

分布式环境中,有些接口是天然保证幂等性的,如查询操作。有些对数据的修改是一个常量,并且无其他记录和操作,那也可以说是具有幂等性的。其他情况下,所有涉及对数据的修改、状态的变更就都有必要防止重复性操作的发生。通过间接的实现接口的幂等性来防止重复操作所带来的影响,成为了一种有效的解决方案。

GTIS

GTIS就是这样的一个解决方案。它是一个轻量的重复操作关卡系统,它能够确保在分布式环境中操作的唯一性。我们可以用它来间接保证每个操作的幂等性。它具有如下特点:

  • 高效:低延时,单个方法平均响应时间在2ms内,几乎不会对业务造成影响;
  • 可靠:提供降级策略,以应对外部存储引擎故障所造成的影响;提供应用鉴权,提供集群配置自定义,降低不同业务之间的干扰;
  • 简单:接入简捷方便,学习成本低。只需简单的配置,在代码中进行两个方法的调用即可完成所有的接入工作;
  • 灵活:提供多种接口参数、使用策略,以满足不同的业务需求。

实现原理

基本原理

GTIS的实现思路是将每一个不同的业务操作赋予其唯一性。这个唯一性是通过对不同操作所对应的唯一的内容特性生成一个唯一的全局ID来实现的。基本原则为:相同的操作生成相同的全局ID;不同的操作生成不同的全局ID。

生成的全局ID需要存储在外部存储引擎中,数据库、Redis亦或是Tair等等均可实现。考虑到Tair天生分布式和持久化的优势,目前的GTIS存储在Tair中。其相应的key和value如下:

  • key:将对于不同的业务,采用APP_KEY+业务操作内容特性生成一个唯一标识trans_contents。然后对唯一标识进行加密生成全局ID作为Key。
  • value:current_timestamp + trans_contents,current_timestamp用于标识当前的操作线程。

判断是否重复,主要利用Tair的SETNX方法,如果原来没有值则set且返回成功,如果已经有值则返回失败。

内部流程

GTIS的内部实现流程为:

  1. 业务方在业务操作之前,生成一个能够唯一标识该操作的transContents,传入GTIS;
  2. GTIS根据传入的transContents,用MD5生成全局ID;
  3. GTIS将全局ID作为key,current_timestamp+transContents作为value放入Tair进行setNx,将结果返回给业务方;
  4. 业务方根据返回结果确定能否开始进行业务操作;
  5. 若能,开始进行操作;若不能,则结束当前操作;
  6. 业务方将操作结果和请求结果传入GTIS,系统进行一次请求结果的检验;
  7. 若该次操作成功,GTIS根据key取出value值,跟传入的返回结果进行比对,如果两者相等,则将该全局ID的过期时间改为较长时间;
  8. GTIS返回最终结果。

实现难点

GTIS的实现难点在于如何保证其判断重复的可靠性。由于分布式环境的复杂度和业务操作的不确定性,在上一章节分布式锁的实现中考虑的网络断开或主机宕机等等问题,同样需要在GTIS中设法解决。这里列出几个典型的场景:

  • 如果操作执行失败,理想的情况应该是另一个相同的操作可以立即进行。因此,需要对业务方的操作结果进行判断,如果操作失败,那么就需要立即删除该全局ID;
  • 如果操作超时或主机宕机,当前的操作无法告知GTIS操作是否成功。那么我们必须引入超时机制,一旦长时间获取不到业务方的操作反馈,那么也需要该全局ID失效;
  • 结合上两个场景,既然全局ID会失效并且可能会被删除,那就需要保证删除的不是另一个相同操作的全局ID。这就需要将特殊的标识记录下来,并由此来判断。这里所用的标识为当前时间戳。

可以看到,解决这些问题的思路,也和上一章节中的实现有很多类似的地方。除此以外,还有更多的场景需要考虑和解决,所有分支流程如下:
GTIS原理

使用说明

使用时,业务方只需要在操作的前后调用GTIS的前置方法和后置方法,如下图所示。如果前置方法返回可进行操作,则说明此时无重复操作,可以进行。否则则直接结束操作。

redis分布式锁实现

使用方需要考虑的主要是下面两个参数:

  • 空间全局性:业务方输入的能够标志操作唯一性的内容特性,可以是唯一性的String类型的ID,也可以是map、POJO等形式。如订单ID等
  • 时间全局性:确定在多长时间内不允许重复,1小时内还是一个月内亦或是永久。

此外,GTIS还提供了不同的故障处理策略和重试机制,以此来降低外部存储引擎异常对系统造成的影响。

目前,GTIS已经持续迭代了7个版本,距离第一个版本有近1年之久,先后在美团点评多个项目中稳定运行。

结语

在分布式环境中,操作互斥性问题和幂等性问题非常普遍。经过分析,我们找出了解决这两个问题的基本思路和实现原理,给出了具体的解决方案。

针对操作互斥性问题,常见的做法便是通过分布式锁来处理对共享资源的抢占。分布式锁的实现,很大程度借鉴了多线程和多进程环境中的互斥锁的实现原理。只要满足一些存储方面的基本条件,并且能够解决如网络断开等异常情况,那么就可以实现一个分布式锁。目前已经有基于Zookeeper和Redis等存储引擎的比较典型的分布式锁实现。但是由于单存储引擎的局限,我们开发了基于ZooKeeper和Tair的多引擎分布式锁Cerberus,它具有使用灵活方便等诸多优点,还提供了完善的一键降级方案。

针对操作幂等性问题,我们可以通过防止重复操作来间接的实现接口的幂等性。GTIS提供了一套可靠的解决方法:依赖于存储引擎,通过对不同操作所对应的唯一的内容特性生成一个唯一的全局ID来防止操作重复。

目前Cerberus分布式锁、GTIS都已应用在生产环境并平稳运行。两者提供的解决方案已经能够解决大多数分布式环境中的操作互斥性和幂等性的问题。值得一提的是,分布式锁和GTIS都不是万能的,它们对外部存储系统的强依赖使得在环境不那么稳定的情况下,对可靠性会造成一定的影响。在并发量过高的情况下,如果不能很好的控制锁的粒度,那么使用分布式锁也是不太合适的。总的来说,分布式环境下的业务场景纷繁复杂,要解决互斥性和幂等性问题还需要结合当前系统架构、业务需求和未来演进综合考虑。Cerberus分布式锁和GTIS也会持续不断地迭代更新,提供更多的引擎选择、更高效可靠的实现方式、更简捷的接入流程,以期满足更复杂的使用场景和业务需求。

原文出自:https://tech.meituan.com/archives

Spark Streaming + Elasticsearch构建App异常监控平台

美团点评阅读(1977)

本文已发表在《程序员》杂志2016年10月期。

如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满。但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑。为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常。一旦发现严重问题,及时进行热修复,从而把损失降到最低。App异常监控平台,就是将这个方法服务化。

低成本

小型创业团队一般会选择第三方平台提供的异常监控服务。但中型以上规模的团队,往往会因为不想把核心数据共享给第三方平台,而选择独立开发。造轮子,首先要考虑的就是成本问题。我们选择了站在开源巨人的肩膀上,如图1所示。

 图1 数据流向示意图

Spark Streaming

每天来自客户端和服务器的大量异常信息,会源源不断的上报到异常平台的Kafka中,因此我们面临的是一个大规模流式数据处理问题。美团点评数据平台提供了Storm和Spark Streaming两种流式计算解决方案。我们主要考虑到团队之前在Spark批处理方面有较多积累,使用Spark Streaming成本较低,就选择了后者。

Elasticsearch

Elasticsearch(后文简称ES),是一个开源搜索引擎。不过在监控平台中,我们是当做“数据库”来使用的。为了降低展示层的接入成本,我们还使用了另一个开源项目ES SQL提供类SQL查询。ES的运维成本,相对 SQL on HBase方案也要低很多。整个项目开发只用了不到700行代码,开发维护成本还是非常低的。那如此“简单”的系统,可用性可以保证吗?

高可用

Spark Streaming + Kafka的组合,提供了“Exactly Once”保证:异常数据经过流式处理后,保证结果数据中(注:并不能保证处理过程中),每条异常最多出现一次,且最少出现一次。保证Exactly Once是实现24/7的高可用服务最困难的地方。在实际生产中会出现很多情况,对Exactly Once的保证提出挑战:

异常重启

Spark提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。这就保证了即使发生异常情况,也可以实现每条数据至少写一次HDFS。再覆写相同的HDFS文件就保证了Exactly Once(注:并不是所有业务场景都允许覆写)。写ES的结果也一样可以保证Exactly Once。你可以把ES的索引,就当成HDFS文件一样来用:新建、删除、移动、覆写。
作为一个24/7运行的程序,在实际生产中,异常是很常见的,需要有这样的容错机制。但是否遇到所有异常,都要立刻挂掉再重启呢?显然不是,甚至在一些场景下,你即使重启了,还是会继续挂掉。我们的解决思路是:尽可能把异常包住,让异常发生时,暂时不影响服务。

 图 2 作业异常重启架构图

如图2所示,包住异常,并不意味可以忽略它,必须把异常收集到Spark Driver端,接入监控(报警)系统,人工判断问题的严重性,确定修复的优先级。
为了更好地掌控Spark Streaming服务的状态,我们还单独开发了一个作业调度(重启)工具。美团点评数据平台安全认证的有效期是7天,一般离线的批处理作业很少会运行超过这个时间,但Spark Streaming作业就不同了,它需要一直保持运行,所以作业只要超过7天就会出现异常。因为没有找到优雅的解决方案,只好粗暴地利用调度工具,每周重启刷新安全认证,来保证服务的稳定。

升级重导

Spark提供了2种读取Kafka的模式:“Receiver-based Approach”和“Direct Approach”。使用Receiver模式,在极端情况下会出现Receiver OOM问题。
使用Direct模式可以避免这个问题。我们使用的就是这种Low-level模式,但在一些情况下需要我们自己维护Kafka Offset:
升级代码:开启Checkpoint后,如果想改动代码,需要清空之前的Checkpoint目录后再启动,否则改动可能不会生效。但当这样做了之后,就会发现另一个问题——程序“忘记”上次读到了哪个位置,因为存储在Checkpoint中的Offset信息也一同被清空了。这种情况下,需要自己用ZooKeeper维护Kafka的Offset。
重导数据:重导数据的场景也是,当希望从之前的某一个时间点开始重新开始计算的时候,显然也需要自己维护时间和Offset的映射关系。
自己维护Offset的成本并不高,所以看起来Checkpoint功能很鸡肋。其实可以有一些特殊用法的,例如,因为Python不需要编译,所以如果使用的是PySpark,可以把主要业务逻辑写在提交脚本的外边,再使用Import调用。这样升级主要业务逻辑代码时,只要重启一下程序即可。网上有不少团队分享过升级代码的“黑科技”,这里不再展开。
实现24/7监控服务,我们不仅要解决纯稳定性问题,还要解决延迟问题。

低延迟

App异常监控,需要保证数据延迟在分钟级。
虽然Spark Streaming有着强大的分布式计算能力,但要满足用户角度的低延迟,可不是单纯的能计算完这么简单。

输入问题

iOS App崩溃时,会生成Crash Log,但其内容是一堆十六进制的内存地址,对开发者来说就是“天书”。只有经过“符号化”的Crash Log,开发者才能看懂。因为符号化需要在Mac环境下进行,而我们的Mac集群资源有限,不能符号化全部Crash Log。即使做了去重等优化,符号化后的数据流还是有延迟。每条异常信息中,包含N维数据,如果不做符号化只能拿到其中的M维。

 图 3 双延迟乱序数据流融合示意图

如图3所示,我们将数据源分为符号化数据流、未符号化数据流,可以看出两个数据流的相对延迟时间T较稳定。如果直接使用符号化后的数据流,那么全部N维数据都会延迟时间T。为了降低用户角度的延迟,我们根据经验加大了时间窗口:先存储未符号化的M维数据,等到拿到对应的符号化数据后,再覆写全部N维数据,这样就只有N-M维数据延迟时间T了。

输出问题

如果Spark Streaming计算结果只是写入HDFS,很难遇到什么性能问题。但你如果想写入ES,问题就来了。因为ES的写入速度大概是每秒1万行,只靠增加Spark Streaming的计算能力,很难突破这个瓶颈。
异常数据源的特点是数据量的波峰波谷相差巨大。由于我们使用了 Direct 模式,不会因为数据量暴涨而挂掉,但这样的“稳定”从用户角度看没有任何意义:短时间内,数据延迟会越来越大,暴增后新出现的异常无法及时报出来。为了解决这个问题,我们制定了一套服务降级方案。

 服务降级方案示意图

如图4所示,我们根据写ES的实际瓶颈K,对每个周期处理的全部数据N使用水塘抽样(比例K/N),保证始终不超过瓶颈。并在空闲时刻使用Spark批处理,将N-K部分从HDFS补写到ES。既然写ES这么慢,那我们为什么还要用ES呢?

高性能

开发者需要在监控平台上分析异常。实际分析场景可以抽象描述为:“实时 秒级 明细 聚合” 数据查询。
我们团队在使用的OLAP解决方案可以分为4种,它们各有各的优势:

  • SQL on HBase方案,例如:Phoenix、Kylin。我们团队从2015年Q1开始,陆续在SEM、SEO生产环境中使用Phoenix、Kylin至今。Phoenix算是一个“全能选手”,但更适合业务模式较固定的场景;Kylin是一个很不错的OLAP产品,但它的问题是不能很好支持实时查询和明细查询,因为它需要离线预聚合。另外,基于其他NoSQL的方案,基本大同小异,如果选择HBase,建议团队在HBase运维方面有一定积累。
  • SQL on HDFS方案,例如:Presto、Spark SQL。这两个产品,因为只能做到亚秒级查询,我们平时多用在数据挖掘的场景中。
  • 时序数据库方案,例如:Druid、OpenTSDB。OpenTSDB是我们旧版App异常监控系统使用过的方案,更适合做系统指标监控。
  • 搜索引擎方案,代表项目有ES。相对上面的3种方案,基于倒排索引的ES非常适合异常分析的场景,可以满足:实时、秒级、明细、聚合,全部4种需求。

ES在实际使用中的表现如何呢?

明细查询

支持明显查询,算是ES的主要特色,但因为是基于倒排索引的,明细查询的结果最多只能取到10000条。在异常分析中,使用明细查询的场景,其实就是追查异常Case,根据条件返回前100条就能满足需求了。例如:已知某设备出现了Crash,直接搜索这个设备的DeviceId就可以看到这个设备最近的异常数据。我们在生产环境中做到了95%的明细查询场景1秒内返回。

聚合查询

面对爆炸的异常信息,一味追求全是不现实,也是没必要的。开发者需要能快速发现关键问题。
因此平台需要支持多维度聚合查询,例如按模块版本机型城市等分类聚合,如图5所示。

 图 5 聚合查询页面截图

不用做优化,ES聚合查询的性能就已经可以满足需求。因此,我们只做了一些小的使用改进,例如:很多异常数据在各个维度的值都是相同的,做预聚合可以提高一些场景下的查询速度。开发者更关心最近48小时发生的异常,分离冷热数据,自动清理历史数据也有助于提升性能。最终在生产环境中,做到了90%的聚合查询场景1秒内返回。

可扩展

异常平台不止要监控App Crash,还要监控服务端的异常、性能等。不同业务的数据维度是不同的,相同业务的数据维度也会不断的变化,如果每次新增业务或维度都需要修改代码,那整套系统的升级维护成本就会很高。

维度

为了增强平台的可扩展性,我们做了全平台联动的动态维度扩展:如果App开发人员在日志中新增了一个“城市”维度,那么他不需要联系监控平台做项目排期,立刻就可以在平台中查询“城市”维度的聚合数据。只需要制定好数据收集、数据处理、数据展示之间的交互协议,做到动态维度扩展就很轻松了。需要注意的是,ES中需要聚合的维度,Index要设置为“not_analyzed”。
想要支持动态字段扩展,还要使用动态模板,样例如下:

{
    "mappings": {
        "es_type_name": {
            "dynamic_templates": [
                {
                    "template_1": {
                        "match": "*log*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string"
                        }
                    }
                },
                {
                    "template_2": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            ]
        }
    }
}

资源

美团点评数据平台提供了Kafka、Spark、ES的集群,整套技术栈在资源上也是分布式可扩展的。
线上集群使用的版本:

  • kafka-0.8.2.0
  • spark-1.5.2
  • elasticsearch-2.1.1

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交