若谷学院
互联网公司技术架构分享

HDFS NameNode内存全景

美团点评阅读(2727)

一、概述

从整个HDFS系统架构上看,NameNode是其中最重要、最复杂也是最容易出现问题的地方,而且一旦NameNode出现故障,整个Hadoop集群就将处于不可服务的状态,同时随着数据规模和集群规模地持续增长,很多小量级时被隐藏的问题逐渐暴露出来。所以,从更高层次掌握NameNode的内部结构和运行机制尤其重要。除特别说明外,本文基于社区版本Hadoop-2.4.1[1][2],虽然2.4.1之后已经有多次版本迭代,但是基本原理相同。

NameNode管理着整个HDFS文件系统的元数据。从架构设计上看,元数据大致分成两个层次:Namespace管理层,负责管理文件系统中的树状目录结构以及文件与数据块的映射关系;块管理层,负责管理文件系统中文件的物理块与实际存储位置的映射关系BlocksMap,如图1所示[1]。Namespace管理的元数据除内存常驻外,也会周期Flush到持久化设备上FsImage文件;BlocksMap元数据只在内存中存在;当NameNode发生重启,首先从持久化设备中读取FsImage构建Namespace,之后根据DataNode的汇报信息重新构造BlocksMap。这两部分数据结构是占据了NameNode大部分JVM Heap空间。

HDFS结构图

除了对文件系统本身元数据的管理之外,NameNode还需要维护整个集群的机架及DataNode的信息、Lease管理以及集中式缓存引入的缓存管理等等。这几部分数据结构空间占用相对固定,且占用较小。

测试数据显示,Namespace目录和文件总量到2亿,数据块总量到3亿后,常驻内存使用量超过90GB。

二、内存全景

如前述,NameNode整个内存结构大致可以分成四大部分:Namespace、BlocksMap、NetworkTopology及其它,图2为各数据结构内存逻辑分布图示。

namenode内存全景图

Namespace:维护整个文件系统的目录树结构及目录树上的状态变化;
BlockManager:维护整个文件系统中与数据块相关的信息及数据块的状态变化;
NetworkTopology:维护机架拓扑及DataNode信息,机架感知的基础;
其它:
LeaseManager:读写的互斥同步就是靠Lease实现,支持HDFS的Write-Once-Read-Many的核心数据结构;
CacheManager:Hadoop 2.3.0引入的集中式缓存新特性,支持集中式缓存的管理,实现memory-locality提升读性能;
SnapshotManager:Hadoop 2.1.0引入的Snapshot新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
DelegationTokenSecretManager:管理HDFS的安全访问;
另外还有临时数据信息、统计信息metrics等等。

NameNode常驻内存主要被Namespace和BlockManager使用,二者使用占比分别接近50%。其它部分内存开销较小且相对固定,与Namespace和BlockManager相比基本可以忽略。

三、内存分析

3.1 Namespace

与单机文件系统相似,HDFS对文件系统的目录结构也是按照树状结构维护,Namespace保存了目录树及每个目录/文件节点的属性。除在内存常驻外,这部分数据会定期flush到持久化设备上,生成一个新的FsImage文件,方便NameNode发生重启时,从FsImage及时恢复整个Namespace。图3所示为Namespace内存结构。前述集群中目录和文件总量即整个Namespace目录树中包含的节点总数,可见Namespace本身其实是一棵非常巨大的树。

Namespace内存结构

在整个Namespace目录树中存在两种不同类型的INode数据结构:INodeDirectory和INodeFile。其中INodeDirectory标识的是目录树中的目录,INodeFile标识的是目录树中的文件。由于二者均继承自INode,所以具备大部分相同的公共信息INodeWithAdditionalFields,除常用基础属性外,其中还提供了扩展属性features,如Quota、Snapshot等均通过Feature增加,如果以后出现新属性也可通过Feature方便扩展。不同的是,INodeFile特有的标识副本数和数据块大小组合的header(2.6.1之后又新增了标识存储策略ID的信息)及该文件包含的有序Blocks数组;INodeDirectory则特有子节点的列表children。这里需要特别说明children是默认大小为5的ArrayList,按照子节点name有序存储,虽然在插入时会损失一部分写性能,但是可以方便后续快速二分查找提高读性能,对一般存储系统,读操作比写操作占比要高。具体的继承关系见图4所示。

INode继承关系

3.2 BlockManager

BlocksMap在NameNode内存空间占据很大比例,由BlockManager统一管理,相比Namespace,BlockManager管理的这部分数据要复杂的多。Namespace与BlockManager之间通过前面提到的INodeFile有序Blocks数组关联到一起。图5所示BlockManager管理的内存结构。

BlockManager管理的内存结构

每一个INodeFile都会包含数量不等的Block,具体数量由文件大小及每一个Block大小(默认为64M)比值决定,这些Block按照所在文件的先后顺序组成BlockInfo数组,如图5所示的BlockInfo[A~K],BlockInfo维护的是Block的元数据,结构如图6所示,数据本身是由DataNode管理,所以BlockInfo需要包含实际数据到底由哪些DataNode管理的信息,这里的核心是名为triplets的Object数组,大小为3*replicas,其中replicas是Block副本数量。triplets包含的信息:

  • triplets[i]:Block所在的DataNode;
  • triplets[i+1]:该DataNode上前一个Block;
  • triplets[i+2]:该DataNode上后一个Block;

其中i表示的是Block的第i个副本,i取值[0,replicas)。

BlockInfo继承关系

从前面描述可以看到BlockInfo几块重要信息:文件包含了哪些Block,这些Block分别被实际存储在哪些DataNode上,DataNode上所有Block前后链表关系。

如果从信息完整度来看,以上数据足够支持所有关于HDFS文件系统的正常操作,但还存在一个使用场景较多的问题:不能通过blockid快速定位Block,所以引入了BlocksMap。

BlocksMap底层通过LightWeightGSet实现,本质是一个链式解决冲突的哈希表。为了避免rehash过程带来的性能开销,初始化时,索引空间直接给到了整个JVM可用内存的2%,并且不再变化。集群启动过程,DataNode会进行BR(BlockReport),根据BR的每一个Block计算其HashCode,之后将对应的BlockInfo插入到相应位置逐渐构建起来巨大的BlocksMap。前面在INodeFile里也提到的BlockInfo集合,如果我们将BlocksMap里的BlockInfo与所有INodeFile里的BlockInfo分别收集起来,可以发现两个集合完全相同,事实上BlocksMap里所有的BlockInfo就是INodeFile中对应BlockInfo的引用;通过Block查找对应BlockInfo时,也是先对Block计算HashCode,根据结果快速定位到对应的BlockInfo信息。至此涉及到HDFS文件系统本身元数据的问题基本上已经解决了。

前面提到部分都属于静态数据部分,NameNode内存中所有数据都要随读写情况发生变化,BlockManager当然也需要管理这部分动态数据。主要是当Block发生变化不符合预期时需要及时调整Blocks的分布。这里涉及几个核心的数据结构:

excessReplicateMap:若某个Block实际存储的副本数多于预设副本数,这时候需要删除多余副本,这里多余副本会被置于excessReplicateMap中。excessReplicateMap是从DataNode的StorageID到Block集合的映射集。
neededReplications:若某个Block实际存储的副本数少于预设副本数,这时候需要补充缺少副本,这里哪些Block缺少多少个副本都统一存在neededReplications里,本质上neededReplications是一个优先级队列,缺少副本数越多的Block之后越会被优先处理。
invalidateBlocks:若某个Block即将被删除,会被置于invalidateBlocks中。invalidateBlocks是从DataNode的StorageID到Block集合的映射集。如某个文件被客户端执行了删除操作,该文件所属的所有Block会先被置于invalidateBlocks中。
corruptReplicas:有些场景Block由于时间戳/长度不匹配等等造成Block不可用,会被暂存在corruptReplicas中,之后再做处理。

前面几个涉及到Block分布情况动态变化的核心数据结构,这里的数据实际上是过渡性质的,BlockManager内部的ReplicationMonitor线程(图5标识Thread/Monitor)会持续从其中取出数据并通过逻辑处理后分发给具体的DatanodeDescriptor对应数据结构(3.3 NetworkTopology里会有简单介绍),当对应DataNode的心跳过来之后,NameNode会遍历DatanodeDescriptor里暂存的数据,将其转换成对应指令返回给DataNode,DataNode收到任务并执行完成后再反馈回NameNode,之后DatanodeDescriptor里对应信息被清除。如BlockB预设副本数为3,由于某种原因实际副本变成4(如之前下线的DataNode D重新上线,其中B正好有BlockB的一个副本数据),BlockManager能及时发现副本变化,并将多余的DataNode D上BlockB副本放置到excessReplicateMap中,ReplicationMonitor线程定期检查时发现excessReplicateMap中数据后将其移到DataNode D对应DatanodeDescriptor中invalidateBlocks里,当DataNode D下次心跳过来后,随心跳返回删除Block B的指令,DataNode D收到指令实际删除其上的Block B数据并反馈回NameNode,此后BlockManager将DataNode D上的Block B从内存中清除,至此Block B的副本符合预期,整个流程如图7所示。

副本数异常时处理过程

3.3 NetworkTopology

前面多次提到Block与DataNode之间的关联关系,事实上NameNode确实还需要管理所有DataNode,不仅如此,由于数据写入前需要确定数据块写入位置,NameNode还维护着整个机架拓扑NetworkTopology。图8所示内存中机架拓扑图。

NetworkTopology内存结构

从图8可以看出这里包含两个部分:机架拓扑结构NetworkTopology和DataNode节点信息。其中树状的机架拓扑是根据机架感知(一般都是外部脚本计算得到)在集群启动完成后建立起来,整个机架的拓扑结构在NameNode的生命周期内一般不会发生变化;另一部分是比较关键的DataNode信息,BlockManager已经提到每一个DataNode上的Blocks集合都会形成一个双向链表,更准确的应该是DataNode的每一个存储单元DatanodeStorageInfo上的所有Blocks集合会形成一个双向链表,这个链表的入口就是机架拓扑结构叶子节点即DataNode管理的DatanodeStorageInfo。此外由于上层应用对数据的增删查随时发生变化,随之DatanodeStorageInfo上的Blocks也会动态变化,所以NetworkTopology上的DataNode对象还会管理这些动态变化的数据结构,如replicateBlocks/recoverBlocks/invalidateBlocks,这些数据结构正好和BlockManager管理的动态数据结构对应,实现了数据的动态变化由BlockManager传达到DataNode内存对象最后通过指令下达到物理DataNode实际执行的流动过程,流程在3.2 BlockManager已经介绍。

这里存在一个问题,为什么DatanodeStorageInfo下所有Block之间会以双向链表组织,而不是其它数据结构?如果结合实际场景就不难发现,对每一个DatanodeStorageInfo下Block的操作集中在快速增加/删除(Block动态增减变化)及顺序遍历(BlockReport期间),所以双向链表是非常合适的数据结构。

3.4 LeaseManager

Lease 机制是重要的分布式协议,广泛应用于各种实际的分布式系统中。HDFS支持Write-Once-Read-Many,对文件写操作的互斥同步靠Lease实现。Lease实际上是时间约束锁,其主要特点是排他性。客户端写文件时需要先申请一个Lease,一旦有客户端持有了某个文件的Lease,其它客户端就不可能再申请到该文件的Lease,这就保证了同一时刻对一个文件的写操作只能发生在一个客户端。NameNode的LeaseManager是Lease机制的核心,维护了文件与Lease、客户端与Lease的对应关系,这类信息会随写数据的变化实时发生对应改变。

LeaseManager的内存数据结构

图9所示为LeaseManager内存结构,包括以下三个主要核心数据结构:

sortedLeases:Lease集合,按照时间先后有序组织,便于检查Lease是否超时;
leases:客户端到Lease的映射关系;
sortedLeasesByPath:文件路径到Lease的映射关系;

其中每一个写数据的客户端会对应一个Lease,每个Lease里包含至少一个标识文件路径的Path。Lease本身已经维护了其持有者(客户端)及该Lease正在操作的文件路径集合,之所以增加了leases和sortedLeasesByPath为提高通过Lease持有者或文件路径快速索引到Lease的性能。

由于Lease本身的时间约束特性,当Lease发生超时后需要强制回收,内存中与该Lease相关的内容要被及时清除。超时检查及超时后的处理逻辑由LeaseManager.Monitor统一执行。LeaseManager中维护了两个与Lease相关的超时时间:软超时(softLimit)和硬超时(hardLimit),使用场景稍有不同。

正常情况下,客户端向集群写文件前需要向NameNode的LeaseManager申请Lease;写文件过程中定期更新Lease时间,以防Lease过期,周期与softLimit相关;写完数据后申请释放Lease。整个过程可能发生两类问题:(1)写文件过程中客户端没有及时更新Lease时间;(2)写完文件后没有成功释放Lease。两个问题分别对应为softLimit和hardLimit。两种场景都会触发LeaseManager对Lease超时强制回收。如果客户端写文件过程中没有及时更新Lease超过softLimit时间后,另一客户端尝试对同一文件进行写操作时触发Lease软超时强制回收;如果客户端写文件完成但是没有成功释放Lease,则会由LeaseManager的后台线程LeaseManager.Monitor检查是否硬超时后统一触发超时回收。不管是softLimit还是hardLimit超时触发的强制Lease回收,处理逻辑都一样:FSNamesystem.internalReleaseLease,逻辑本身比较复杂,这里不再展开,简单的说先对Lease过期前最后一次写入的Block进行检查和修复,之后释放超时持有的Lease,保证后面其它客户端的写入能够正常申请到该文件的Lease。

NameNode内存数据结构非常丰富,这里对几个重要的数据结构进行了简单的描述,除了前面罗列之外,其实还有如SnapShotManager/CacheManager等,由于其内存占用有限且有一些特性还尚未稳定,这里不再展开。

四、问题

随着集群中数据规模的不断积累,NameNode内存占用随之成比例增长。不可避免的NameNode内存将逐渐成为集群发展的瓶颈,并开始暴漏诸多问题。

1、启动时间变长。NameNode的启动过程可以分成FsImage数据加载、editlogs回放、Checkpoint、DataNode的BlockReport几个阶段。数据规模较小时,启动时间可以控制在~10min以内,当元数据规模达到5亿(Namespace中INode数超过2亿,Block数接近3亿),FsImage文件大小将接近到20GB,加载FsImage数据就需要~14min,Checkpoint需要~6min,再加上其它阶段整个重启过程将持续~50min,极端情况甚至超过60min,虽然经过多轮优化重启过程已经能够稳定在~30min,但也非常耗时。如果数据规模继续增加,启动过程将同步增加。

2、性能开始下降。HDFS文件系统的所有元数据相关操作基本上均在NameNode端完成,当数据规模的增加致内存占用变大后,元数据的增删改查性能会出现下降,且这种下降趋势会因规模效应及复杂的处理逻辑被放大,相对复杂的RPC请求(如addblock)性能下降更加明显。

3、NameNode JVM FGC(Full GC)风险较高。主要体现在两个方面:(1)FGC频率增加;(2)FGC时间增加且风险不可控。针对NameNode的应用场景,目前看CMS内存回收算法比较主流,正常情况下,对超过100GB内存进行回收处理时,可以控制到秒级别的停顿时间,但是如果回收失败被降级到串行内存回收时,应用的停顿时间将达到数百秒,这对应用本身是致命的。

4、超大JVM Heap Size调试问题。如果线上集群性能表现变差,不得不通过分析内存才能得到结论时,会成为一件异常困难的事情。且不说Dump本身极其费时费力,Dump超大内存时存在极大概率使NameNode不可服务。

针对NameNode内存增长带来的诸多问题,社区和业界都在持续关注并尝试不同的解决方案。整体上两个思路:(1)扩展NameNode分散单点负载;(2)引入外部系统支持NameNode内存数据。

从2010年开始社区就投入大量精力持续解决,Federation方案[3]通过对NameNode进行水平扩展分散单点负载的方式解决NameNode的问题,经过几年的发展该方案逐渐稳定,目前已经被业界广泛使用。除此之外,社区也在尝试将Namespace存储值外部的KV存储系统如LevelDB[4],从而降低NameNode内存负载。

除社区外,业界也在尝试自己的解决方案。Baidu HDFS2[5]将元数据管理通过主从架构的集群形式提供服务,本质上是将原生NameNode管理的Namespace和BlockManagement进行物理拆分。其中Namespace负责管理整个文件系统的目录树及文件到BlockID集合的映射关系,BlockID到DataNode的映射关系是按照一定的规则分到多个服务节点分布式管理,这种方案与Lustre有相似之处(Hash-based Partition)。Taobao HDFS2[6]尝试过采用另外的思路,借助高速存储设备,将元数据通过外存设备进行持久化存储,保持NameNode完全无状态,实现NameNode无限扩展的可能。其它类似的诸多方案不一而足。

尽管社区和业界均对NameNode内存瓶颈有成熟的解决方案,但是不一定适用所有的场景,尤其是中小规模集群。结合实践过程和集群规模发展期可能遇到的NameNode内存相关问题这里有几点建议:

  1. 合并小文件。正如前面提到,目录/文件和Block均会占用NameNode内存空间,大量小文件会降低内存使用效率;另外,小文件的读写性能远远低于大文件的读写,主要原因对小文件读写需要在多个数据源切换,严重影响性能。

  2. 调整合适的BlockSize。主要针对集群内文件较大的业务场景,可以通过调整默认的Block Size大小(参数:dfs.blocksize,默认128M),降低NameNode的内存增长趋势。

  3. HDFS Federation方案。当集群和数据均达到一定规模时,仅通过垂直扩展NameNode已不能很好的支持业务发展,可以考虑HDFS Federation方案实现对NameNode的水平扩展,在解决NameNode的内存问题的同时通过Federation可以达到良好的隔离性,不会因为单一应用压垮整集群。

五、总结

NameNode在整个HDFS系统架构中占据举足轻重的位置,内部数据和处理逻辑相对复杂,本文简单梳理了NameNode的内存全景及对其中几个关键数据结构,从NameNode内存核心数据视角对NameNode进行了简单的解读,并结合实际场景介绍了随着数据规模的增加,NameNode内存可能遇到的问题及业界各种可借鉴的解决方案。在后续的《HDFS NameNode内存详解》中,我们会详细解读NameNode的几个关键数据结构,分析各数据结构在JVM Heap使用占比情况。

六、参考

[1] Apache Hadoop, 2016, https://hadoop.apache.org/.
[2] Apache Hadoop Source Code, 2014, https://github.com/apache/hadoop/tree/branch-2.4.1/.
[3] HDFS Federation, 2011, https://issues.apache.org/jira/browse/HDFS-1052.
[4] NemeNode Scalability, 2013, https://issues.apache.org/jira/browse/HDFS-5389.
[5] Baidu HDFS2, 2013, http://static.zhizuzhefu.com/wordpress_cp/uploads/2013/04/a9.pdf.
[6] Taobao HDFS2, 2012, https://github.com/taobao/ADFS.

原文出自:https://tech.meituan.com/archives

分布式队列编程优化篇

美团点评阅读(3182)

前言

“分布式队列编程”是一个系列文,之前我们已经发布了《分布式队列编程模型、实战》,主要剖析了分布式队列编程模型的需求来源、定义、结构以及其变化多样性;根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用。本文将重点阐述工程师运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议。

确定采用分布式队列编程模型之后,主体架构就算完成了,但工程师的工作还远远未结束。天下事必做于细,细节是一个不错的架构向一个优秀的系统进阶的关键因素。优化篇选取了作者以及其同事在运用分布式队列编程模型架构时所碰到的典型问题和解决方案。这里些问题出现的频率较高,如果你经验不够,很可能会“踩坑”。希望通过这些讲解,帮助读者降低分布式队列编程模型的使用门槛。本文将对分布式队列编程模型的三种角色:生产者(Producer),分布式队列(Queue),消费者(Consumer)分别进行优化讨论。

生产者优化

在分布式队列编程中,生产者往往并非真正的生产源头,只是整个数据流中的一个节点,这种生产者的操作是处理-转发(Process-Forward)模式。

这种模式给工程师们带来的第一个问题是吞吐量问题。这种模式下运行的生产者,一边接收上游的数据,一边将处理完的数据发送给下游。本质上,它是一个非常经典的数学问题,其抽象模型是一些没有盖子的水箱,每个水箱接收来自上一个水箱的水,进行处理之后,再将水发送到下一个水箱。工程师需要预测水源的流量、每个环节水箱的处理能力、水龙头的排水速度,最终目的是避免水溢出水箱,或者尽可能地减小溢出事件的概率。实际上流式编程框架以及其开发者花了大量的精力去处理和优化这个问题。下文的缓存优化和批量写入优化都是针对该问题的解决方案。

第二个需要考虑的问题是持久化。由于各种原因,系统总是会宕机。如果信息比较敏感,例如计费信息、火车票订单信息等,工程师们需要考虑系统宕机所带来的损失,找到让损失最小化的解决方案。持久化优化重点解决这一类问题。

缓存优化

处于“处理-转发”模式下运行的生产者往往被设计成请求驱动型的服务,即每个请求都会触发一个处理线程,线程处理完后将结果写入分布式队列。如果由于某种原因队列服务不可用,或者性能恶化,随着新请求的到来,生产者的处理线程就会产生堆积。这可能会导致如下两个问题:

  • 系统可用性降低。由于每个线程都需要一定的内存开销,线程过多会使系统内存耗尽,甚至可能产生雪崩效应导致最终完全不可用。
  • 信息丢失。为了避免系统崩溃,工程师可能会给请求驱动型服务设置一个处理线程池,设置最大处理线程数量。这是一种典型的降级策略,目的是为了系统崩溃。但是,后续的请求会因为没有处理线程而被迫阻塞,最终可能产生信息丢失。例如:对于广告计费采集,如果采集系统因为线程耗尽而不接收客户端的计费行为,这些计费行为就会丢失。

缓解这类问题的思路来自于CAP理论,即通过降低一致性来提高可用性。生产者接收线程在收到请求之后第一时间不去处理,直接将请求缓存在内存中(牺牲一致性),而在后台启动多个处理线程从缓存中读取请求、进行处理并写入分布式队列。与线程所占用的内存开销相比,大部分的请求所占内存几乎可以忽略。通过在接收请求和处理请求之间增加一层内存缓存,可以大大提高系统的处理吞吐量和可扩展性。这个方案本质上是一个内存生产者消费者模型。

批量写入优化

如果生产者的请求过大,写分布式队列可能成为性能瓶颈,有如下几个因素:

  • 队列自身性能不高。
  • 分布式队列编程模型往往被应用在跨机房的系统里面,跨机房的网络开销往往容易成为系统瓶颈。
  • 消息确认机制往往会大大降低队列的吞吐量以及响应时间。

如果在处理请求和写队列之间添加一层缓存,消息写入程序批量将消息写入队列,可以大大提高系统的吞吐量。原因如下:

  • 批量写队列可以大大减少生产者和分布式队列的交互次数和消息传输量。特别是对于高吞吐小载荷的消息实体,批量写可以显著降低网络传输量。
  • 对于需要确认机制的消息,确认机制往往会大大降低队列的吞吐量以及响应时间,某些高敏感的消息需要多个消息中间件代理同时确认,这近一步恶化性能。在生产者的应用层将多条消息批量组合成一个消息体,消息中间件就只需要对批量消息进行一次确认,这可能会数量级的提高消息传输性能。

持久化优化

通过添加缓存,消费者服务的吞吐量和可用性都得到了提升。但缓存引入了一个新问题——内存数据丢失。对于敏感数据,工程师需要考虑如下两个潜在问题:

  • 如果内存中存在未处理完的请求,而某些原因导致生产者服务宕机,内存数据就会丢失而可能无法恢复。
  • 如果分布式队列长时间不可用,随着请求数量的不断增加,最终系统内存可能会耗尽而崩溃,内存的消息也可能丢失。

所以缓存中的数据需要定期被持久化到磁盘等持久层设备中,典型的持久化触发策略主要有两种:

  • 定期触发,即每隔一段时间进行一次持久化。
  • 定量触发,即每当缓存中的请求数量达到一定阈值后进行持久化。
    是否需要持久化优化,以及持久化策略应该由请求数据的敏感度、请求量、持久化性能等因素共同决定。

中间件选型

分布式队列不等同于各种开源的或者收费的消息中间件,甚至在一些场景下完全不需要使用消息中间件。但是,消息中间件产生的目的就是解决消息传递问题,这为分布式队列编程架构提供了很多的便利。在实际工作中,工程师们应该将成熟的消息中间件作为队列的首要备选方案。
本小节对消息中间件的功能、模型进行阐述,并给出一些消息中间件选型、部署的具体建议。

中间件的功能

明白一个系统的每个具体功能是设计和架构一个系统的基础。典型的消息中间件主要包含如下几个功能:

  • 消息接收
  • 消息分发
  • 消息存储
  • 消息读取

概念模型

抽象的消息中间件模型包含如下几个角色:

  • 发送者和接收者客户端(Sender/Receiver Client),在具体实施过程中,它们一般以库的形式嵌入到应用程序代码中。
  • 代理服务器(Broker Server),它们是与客户端代码直接交互的服务端代码。
  • 消息交换机(Exchanger),接收到的消息一般需要通过消息交换机(Exchanger)分发到具体的消息队列中。
  • 消息队列,一般是一块内存数据结构或持久化数据。
    概念模型如下图:
    消息队列中间件
    为了提高分发性能,很多消息中间件把消息代理服务器的拓扑图发送到发送者和接收者客户端(Sender/Receiver Client),如此一来,发送源可以直接进行消息分发。

选型标准

要完整的描述消息中间件各个方面非常困难,大部分良好的消息中间件都有完善的文档,这些文档的长度远远超过本文的总长度。但如下几个标准是工程师们在进行消息中间件选型时经常需要考虑和权衡的。

性能

性能主要有两个方面需要考虑:吞吐量(Throughput)和响应时间(Latency)。
不同的消息队列中间件的吞吐量和响应时间相差甚远,在选型时可以去网上查看一些性能对比报告。
对于同一种中间件,不同的配置方式也会影响性能。主要有如下几方面的配置:

  • 是否需要确认机制,即写入队列后,或从队列读取后,是否需要进行确认。确认机制对响应时间的影响往往很大。
  • 能否批处理,即消息能否批量读取或者写入。批量操作可以大大减少应用程序与消息中间件的交互次数和消息传递量,大大提高吞吐量。
  • 能否进行分区(Partition)。将某一主题消息队列进行分区,同一主题消息可以有多台机器并行处理。这不仅仅能影响消息中间件的吞吐量,还决定着消息中间件是否具备良好的可伸缩性(Scalability)。
  • 是否需要进行持久化。将消息进行持久化往往会同时影响吞吐量和响应时间。

可靠性

可靠性主要包含:可用性、持久化、确认机制等。
高可用性的消息中间件应该具备如下特征:

  • 消息中间件代理服务器(Broker)具有主从备份。即当一台代理服务宕机之后,备用服务器能接管相关的服务。
  • 消息中间件中缓存的消息是否有备份、并持久化。
    根据CAP理论,高可用、高一致性以及网络分裂不可兼得。根据作者的观察,大部分的消息中间件在面临网络分裂的情况下下,都很难保证数据的一致性以及可用性。 很多消息中间件都会提供一些可配置策略,让使用者在可用性和一致性之间做权衡。

高可靠的消息中间件应该确保从发送者接收到的消息不会丢失。中间件代理服务器的宕机并不是小概率事件,所以保存在内存中的消息很容易发生丢失。大部分的消息中间件都依赖于消息的持久化去降低消息丢失损失,即将接收到的消息写入磁盘。即使提供持久化,仍有两个问题需要考虑:

  • 磁盘损坏问题。长时间来看,磁盘出问题的概率仍然存在。
  • 性能问题。与操作内存相比,磁盘I/O的操作性能要慢几个数量级。频繁持久化不仅会增加响应时间,也会降低吞吐量。
    解决这两个问题的一个解决方案就是:多机确认,定期持久化。即消息被缓存在多台机器的内存中,只有每台机器都确认收到消息,才跟发送者确认(很多消息中间件都会提供相应的配置选项,让用户设置最少需要多少台机器接收到消息)。由于多台独立机器同时出故障的概率遵循乘法法则,指数级降低,这会大大提高消息中间件的可靠性。

确认机制本质上是通讯的握手机制(Handshaking)。如果没有该机制,消息在传输过程中丢失将不会被发现。高敏感的消息要求选取具备确认机制的消息中间件。当然如果没有接收到消息中间件确认完成的指令,应用程序需要决定如何处理。典型的做法有两个:

  • 多次重试。
  • 暂存到本地磁盘或其它持久化媒介。

客户端接口所支持语言

采用现存消息中间件就意味着避免重复造轮子。如果某个消息中间件未能提供对应语言的客户端接口,则意味着极大的成本和兼容性问题。

投递策略(Delivery policies)

投递策略指的是一个消息会被发送几次。主要包含三种策略:最多一次(At most Once )、最少一次(At least Once)、仅有一次(Exactly Once)。
在实际应用中,只考虑消息中间件的投递策略并不能保证业务的投递策略,因为接收者在确认收到消息和处理完消息并持久化之间存在一个时间窗口。例如,即使消息中间件保证仅有一次(Exactly Once),如果接收者先确认消息,在持久化之前宕机,则该消息并未被处理。从应用的角度,这就是最多一次(At most Once)。反之,接收者先处理消息并完成持久化,但在确认之前宕机,消息就要被再次发送,这就是最少一次(At least Once)。 如果消息投递策略非常重要,应用程序自身也需要仔细设计。

消费者优化

消费者是分布式队列编程中真正的数据处理方,数据处理方最常见的挑战包括:有序性、串行化(Serializability)、频次控制、完整性和一致性等。

挑战

有序性

在很多场景下,如何保证队列信息的有序处理是一个棘手的问题。如下图,假定分布式队列保证请求严格有序,请求ri2和ri1都是针对同一数据记录的不同状态,ri2的状态比ri1的状态新。T1、T2、T3和T4代表各个操作发生的时间,并且 T1 < T2 < T3 < T4(”<“代表早于)。
采用多消费者架构,这两条记录被两个消费者(Consumer1和Consumer2)处理后更新到数据库里面。Consumer1虽然先读取ri1但是却后写入数据库,这就导致,新的状态被老的状态覆盖,所以多消费者不保证数据的有序性。
有序性

串行化

很多场景下,串行化是数据处理的一个基本需求,这是保证数据完整性、可恢复性、事务原子性等的基础。为了在并行计算系统里实现串行化,一系列的相关理论和实践算法被提出。对于分布式队列编程架构,要在在多台消费者实现串行化非常复杂,无异于重复造轮子。

频次控制

有时候,消费者的消费频次需要被控制,可能的原因包括:

  • 费用问题。如果每次消费所引起的操作都需要收费,而同一个请求消息在队列中保存多份,不进行频次控制,就会导致无谓的浪费。
  • 性能问题。每次消费可能会引起对其他服务的调用,被调用服务希望对调用量有所控制,对同一个请求消息的多次访问就需要有所控制。

完整性和一致性

完整性和一致性是所有多线程和多进程的代码都面临的问题。在多线程或者多进程的系统中考虑完整性和一致性往往会大大地增加代码的复杂度和系统出错的概率。

单例服务优化

几乎所有串行化理论真正解决的问题只有一个:性能。 所以,在性能允许的前提下,对于消费者角色,建议采用单实例部署。通过单实例部署,有序性、串行化、完整性和一致性问题自动获得了解决。另外,单实例部署的消费者拥有全部所需信息,它可以在频次控制上采取很多优化策略。

天下没有免费的午餐。同样,单实例部署并非没有代价,它意味着系统可用性的降低,很多时候,这是无法接受的。解决可用性问题的最直接的思路就是冗余(Redundancy)。最常用的冗余方案是Master-slave架构,不过大部分的Master-slave架构都是Active/active模式,即主从服务器都提供服务。例如,数据库的Master-slave架构就是主从服务器都提供读服务,只有主服务器提供写服务。大部分基于负载均衡设计的Master-slave集群中,主服务器和从服务器同时提供相同的服务。这显然不满足单例服务优化需求。有序性和串行化需要Active/passive架构,即在某一时刻只有主实例提供服务,其他的从服务等待主实例失效。这是典型的领导人选举架构,即只有获得领导权的实例才能充当实际消费者,其他实例都在等待下一次选举。采用领导人选举的Active/passive架构可以大大缓解纯粹的单实例部署所带来的可用性问题。

令人遗憾的是,除非工程师们自己在消费者实例里面实现Paxos等算法,并在每次消息处理之前都执行领导人选举。否则,理论上讲,没有方法可以保障在同一个时刻只有一个领导者。而对每个消息都执行一次领导人选举,显然性能不可行。实际工作中,最容易出现的问题时机发生在领导人交接过程中,即前任领导人实例变成辅助实例,新部署实例开始承担领导人角色。为了平稳过渡,这两者之间需要有一定的通讯机制,但是,无论是网络分区(Network partition)还是原领导人服务崩溃都会使这种通讯机制变的不可能。

对于完整性和一致性要求很高的系统,我们需要在选举制度和交接制度这两块进行优化。

领导人选举架构

典型的领导人选举算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。为了避免重复造轮子,建议采用ZooKeeper的分布式锁来实现领导人选举。典型的ZooKeeper实现算法如下(摘自参考资料[4]):

Let ELECTION be a path of choice of the application. To volunteer to be a leader:

1.Create znode z with path “ELECTION/guid-n_” with both SEQUENCE and EPHEMERAL flags;
2.Let C be the children of “ELECTION”, and i be the sequence number of z;
3.Watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:

1.Let C be the new set of children of ELECTION;
2.If z is the smallest node in C, then execute leader procedure;
3.Otherwise, watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

领导人交接架构

领导人选举的整个过程发生在ZooKeeper集群中,各个消费者实例在这场选举中只充当被告知者角色(Learner)。领导人选举算法,只能保证最终只有一个Leader被选举出来,并不保障被告知者对Leader的理解是完全一致的。本质上,上文的架构里,选举的结果是作为令牌(Token)传递给消费者实例,消费者将自身的ID与令牌进行对比,如果相等,则开始执行消费操作。所以当发生领导人换届的情况,不同的Learner获知新Leader的时间并不同。例如,前任Leader如果因为网络问题与ZooKeeper集群断开,前任Leader只能在超时后才能判断自己是否不再承担Leader角色了,而新的Leader可能在这之前已经产生。另一方面,即使前任Leader和新Leader同时接收到新Leader选举结果,某些业务的完整性要求迫使前任Leader仍然完成当前未完成的工作。以上的讲解非常抽象,生活中却给了一些更加具体的例子。众所周知,美国总统候选人在选举结束后并不直接担任美国总统,从选举到最终承担总统角色需要一个过渡期。对于新当选Leader的候选人而言,过渡期间称之为加冕阶段(Inauguration)。对于即将卸任的Leader,过渡期称为交接阶段(HandOver)。所以一个基于领导人选举的消费者从加冕到卸任经历三个阶段:Inauguration、Execution、HandOver。在加冕阶段,新领导需要进行一些初始化操作。Execution阶段是真正的队列消息处理阶段。在交接阶段,前任领导需要进行一些清理操作。

类似的,为了解决领导人交接问题,所有的消费者从代码实现的角度都需要实现类似ILeaderCareer接口。这个接口包含三个方发inaugurate(),handOver()和execute()。某个部署实例(Learner)在得知自己承担领导人角色后,需要调用inaugurate()方法,进行加冕。主要的消费逻辑通过不停的执行execute()实现,当确认自己不再承担领导人之后,执行handOver()进行交接。

public interface ILeaderCareer {
    public void inaugurate();
    public void handOver();
    public boolean execute();
}

如果承担领导人角色的消费者,在执行execute()阶段得知自己将要下台,根据消息处理的原子性,该领导人可以决定是否提前终止操作。如果整个消息处理是一个原子性事务,直接终止该操作可以快速实现领导人换届。否则,前任领导必须完成当前消息处理后,才进入交接阶段。这意味着新的领导人,在inaugurate()阶段需要进行一定时间的等待。

排重优化

频次控制是一个经典问题。对于分布式队列编程架构,相同请求重复出现在队列的情况并不少见。如果相同请求在队列中重复太多,排重优化就显得很必要。分布式缓存更新是一个典型例子,所有请求都被发送到队列中用于缓存更新。如果请求符合典型的高斯分布,在一段时间内会出现大量重复的请求,而同时多线程更新同一请求缓存显然没有太大的意义。
排重优化是一个算法,其本质是基于状态机的编程,整个讲解通过模型、构思和实施三个步骤完成。

模型

进行排重优化的前提是大量重复的请求。在模型这一小节,我们首先阐述重复度模型、以及不同重复度所导致的消费模型,最后基于这两个模型去讲解排重状态机。

重复度模型

首先我们给出最小重复长度的概念。同一请求最小重复长度:同一请求在队列中的重复出现的最小间距。例如,请求ri第一次出现在位置3,第二次出现在10,最小重复长度等于7。
是否需要进行排重优化取决于队列中请求的重复度。由于不同请求之间并不存在重复的问题,不失一般性,这里的模型只考了单个请求的重复度,重复度分为三个类:无重复、稀疏重复、高重复。
无重复:在整个请求过程,没有任何一个请求出现一次以上。
稀疏重复:主要的请求最小重复长度大于消费队列长度。
高重复:大量请求最小重复长度小于消费队列长度。
对于不同的重复度,会有不同的消费模型。

无重复消费模型

在整个队列处理过程中,所有的请求都不相同,如下图:
无重复消费模型

稀疏重复消费模型

当同一请求最小重复长度大于消费者队列长度,如下图。假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1。虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景。
稀疏重复消费模型

高重复消费模型

如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理:
高重复消费模型
显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处。排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大。

排重状态机

排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理。所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理。如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态。如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义。所以,状态机的第二个目标是等待唯一性,即:同一时刻,同一个请求最多只有一个消费者处于等待状态。总上述,状态机的目标是:处理唯一性和等待唯一性。我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求。
由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机。基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline。各个状态之间转化过程如下图:
排重状态机

  1. 状态机创建时处于Init状态。
  2. 对Init状态进行Enqueue操作,即接收一个请求,开始处理(称为头部请求),状态机进入Process状态。
  3. 状态机处于Process状态,表明当前有消费者正在处理头部请求。此时,如果进行Dequeue操作,即头部请求处理完成,返回Init状态。如果进行Enqueue操作,即另一个消费者准备处理同一个请求,状态机进入Block状态(该请求称为尾部请求)。
  4. 状态机处于Block状态,表明头部请求正在处理,尾部请求处于阻塞状态。此时,进行Dequeue操作,即头部请求处理完成,返回Process状态,并且尾部请求变成头部请求,原尾部请求消费者结束阻塞状态,开始处理。进行Enqueue操作,表明一个新的消费者准备处理同一个请求,状态机进入Decline状态。
  5. 状态机进入Decline状态,根据等待唯一性目标,处理最新请求的消费者将被抛弃该消息,状态机自动转换回Block状态。

构思

状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理。这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法。对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题。

一致性问题

一致性在这里要求同一请求的不同消费者只会操作一个状态机。由于每个请求都产生一个状态机,系统将会包含大量的状态机。为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机。用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机。为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法。代码方案如下,key2Status用于存储所有的状态机。消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate。如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中。

private ConcurrentHashMap<T, TrafficAutomate> key2Status = new ConcurrentHashMap();
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null)
{
    trafficAutomate = new TrafficAutomate();
    TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
    if(oldAutomate != null)
    {
        trafficAutomate = oldAutomate;
    }
}

完整性问题

完整性要求保障状态机Init,Process,Block,Decline四种状态正确、状态之间的转换也正确。由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁。

请求缓存驱逐问题(Cache Eviction)

如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大。所以,某些状态机需要在恰当的时候被驱逐出内存。这里有两个思路:

  • 当状态机返回Init状态时,清除出队列。
  • 启动一个后台线程,定时扫描状态机队列,采用LRU等标准缓存清除机制。

标识问题

每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别。
对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识。

实施

排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的。排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题。

状态机实施(TrafficAutomate)

根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求。为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁。

enQueue操作

当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成。enQueue代码如下:

synchronized ActionEnum enQueue(long id)
{ 
    if(tail != INIT_QUEUE_ID)
    {
        return DECLINE;
    }

    if(head == INIT_QUEUE_ID)
    {
        head = id;
        return ACCEPT;
    }
    else
    {
        tail = id;
        return BLOCK;
    }
}
deQueue操作

对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效。deQueue代码如下:

synchronized boolean deQueue(long id)
{
        head = tail;
        tail = INIT_QUEUE_ID;
        return true;
}

状态机队列实施(QueueCoordinator)

接口定义

状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口。这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求。为了兼容不同类型的请求消息,我们采用了Java泛型编程。接口定义如下:

public interface QueueCoordinator<T> {

    public boolean enQueue(T key);

    public void deQueue(T key);

}
enQueue操作

enQueue操作过程如下:
首先,根据传入的请求key值,获取状态机, 如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中。
接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作。
如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息。如果状态机返回值为Block,则该线程保持等待状态。
异常处理。在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施)。为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限。超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求。
代码如下:

public boolean enQueue(T key) {
    _loggingStastic();

    TrafficAutomate trafficAutomate = key2Status.get(key);
    if(trafficAutomate == null)
    {
        trafficAutomate = new TrafficAutomate();
        TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
        if(oldAutomate != null)
        {
            trafficAutomate = oldAutomate;
        }
    }
    long threadId = Thread.currentThread().getId();

    ActionEnum action = trafficAutomate.enQueue(threadId);

    if(action == DECLINE)
    {
        return false;
    }
    else if (action == ACCEPT)
    {
        return true;
    }
    //Blocking status means some other thread are working on this key, so just wait till timeout
    long start = System.currentTimeMillis();
    long span = 0;
    do {
        _nonExceptionSleep(NAP_TIME_IN_MILL);

        if(trafficAutomate.isHead(threadId))
        {
            return true;
        }

        span = System.currentTimeMillis() - start;
    }while(span <= timeout);

    //remove head so that it won't block the queue for too long
    trafficAutomate.evictHeadByForce(threadId);

    return true;
}
deQueue操作

deQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作。
enQueue代码如下:

public void deQueue(T key) {
    TrafficAutomate trafficAutomate = key2Status.get(key);

    if(trafficAutomate == null)
    {
        logger.error("key {} doesn't exist ", key);
        return;
    }

    long threadId = Thread.currentThread().getId();

    trafficAutomate.deQueue(threadId);
}

源代码

完整源代码可以在QueueCoordinator获取。

参考资料

[1] Rabbit MQ, Highly Available Queues.
[2] IBM Knowledge Center, Introduction to message queuing.
[3] Wikipedia, Serializability.
[4] Hadoop, ZooKeeper Recipes and Solutions.
[5] Apache Kafka.
[6] Lamport L, Paxos Made Simple.

原文出自:https://tech.meituan.com/archives

分布式队列编程:模型、实战

美团点评阅读(3146)

介绍

作为一种基础的抽象数据结构,队列被广泛应用在各类编程中。大数据时代对跨进程、跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在。但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点:

  • 使用分布式队列的时候,没有意识到它是队列。
  • 有具体需求的时候,忘记了分布式队列的存在。

文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源、定义、结构以及其变化多样性。通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能够将具体需求关联到分布式队列编程模型,具备进行分布式队列架构的能力;另一方面,通过全方位的讲解,让读者能够快速识别工作中碰到的各种分布式队列编程模型。

文章的第二部分实战篇。根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用。这些例子的基础模型并非首次出现在互联网的文档中,但是所有的例子都是按照挑战、构思、架构三个步骤进行讲解的。这种讲解方式能给读者一个“从需求出发去构架分布式队列编程”的旅程。

分布式队列编程模型

模型篇从基础的需求出发,去思考何时以及如何使用分布式队列编程模型。建模环节非常重要,因为大部分中高级工程师面临的都是具体的需求,接到需求后的第一个步骤就是建模。通过本篇的讲解,希望读者能够建立起从需求到分布式队列编程模型之间的桥梁。

何时选择分布式队列

通讯是人们最基本的需求,同样也是计算机最基本的需求。对于工程师而言,在编程和技术选型的时候,更容易进入大脑的概念是RPC、RESTful、Ajax、Kafka。在这些具体的概念后面,最本质的东西是“通讯”。所以,大部分建模和架构都需要从“通讯”这个基本概念开始。当确定系统之间有通讯需求的时候,工程师们需要做很多的决策和平衡,这直接影响工程师们是否会选择分布式队列编程模型作为架构。从这个角度出发,影响建模的因素有四个:When、Who、Where、How。

When:同步VS异步

通讯的一个基本问题是:发出去的消息什么时候需要被接收到?这个问题引出了两个基础概念:“同步通讯”和“异步通讯”。根据理论抽象模型,同步通讯和异步通讯最本质的差别来自于时钟机制的有无。同步通讯的双方需要一个校准的时钟,异步通讯的双方不需要时钟。现实的情况是,没有完全校准的时钟,所以没有绝对的同步通讯。同样,绝对异步通讯意味着无法控制一个发出去的消息被接收到的时间点,无期限的等待一个消息显然毫无实际意义。所以,实际编程中所有的通讯既不是“同步通讯”也不是“异步通讯”;或者说,既是“同步通讯”也是“异步通讯”。特别是对于应用层的通讯,其底层架构可能既包含“同步机制”也包含“异步机制”。判断“同步”和“异步”消息的标准问题太深,而不适合继续展开。作者这里给一些启发式的建议:

  • 发出去的消息是否需要确认,如果不需要确认,更像是异步通讯,这种通讯有时候也称为单向通讯(One-Way Communication)。
  • 如果需要确认,可以根据需要确认的时间长短进行判断。时间长的更像是异步通讯,时间短的更像是同步通讯。当然时间长短的概念是纯粹的主观概念,不是客观标准。
  • 发出去的消息是否阻塞下一个指令的执行,如果阻塞,更像是同步,否则,更像是异步。

无论如何,工程师们不能生活在混沌之中,不做决定往往是最坏的决定。当分析一个通讯需求或者进行通讯构架的时候,工程师们被迫作出“同步”还是“异步”的决定。当决策的结论是“异步通讯”的时候,分布式队列编程模型就是一个备选项。

Who:发送者接收者解耦

在进行通讯需求分析的时候,需要回答的另外一个基本问题是:消息的发送方是否关心谁来接收消息,或者反过来,消息接收方是否关心谁来发送消息。如果工程师的结论是:消息的发送方和接收方不关心对方是谁、以及在哪里,分布式队列编程模型就是一个备选项。因为在这种场景下,分布式队列架构所带来的解耦能给系统架构带来这些好处:

  • 无论是发送方还是接收方,只需要跟消息中间件通讯,接口统一。统一意味着降低开发成本。
  • 在不影响性能的前提下,同一套消息中间件部署,可以被不同业务共享。共享意味着降低运维成本。
  • 发送方或者接收方单方面的部署拓扑的变化不影响对应的另一方。解藕意味着灵活和可扩展。

Where:消息暂存机制

在进行通讯发送方设计的时候,令工程师们苦恼的问题是:如果消息无法被迅速处理掉而产生堆积怎么办、能否被直接抛弃?如果根据需求分析,确认存在消息积存,并且消息不应该被抛弃,就应该考虑分布式队列编程模型构架,因为队列可以暂存消息。

How:如何传递

对通讯需求进行架构,一系列的基础挑战会迎面而来,这包括:

  • 可用性,如何保障通讯的高可用。
  • 可靠性,如何保证消息被可靠地传递。
  • 持久化,如何保证消息不会丢失。
  • 吞吐量和响应时间。
  • 跨平台兼容性。
    除非工程师对造轮子有足够的兴趣,并且有充足的时间,采用一个满足各项指标的分布式队列编程模型就是一个简单的选择。

分布式队列编程定义

很难给出分布式队列编程模型的精确定义,由于本文偏重于应用,作者并不打算完全参照某个标准的模型。总体而言:分布式队列编程模型包含三类角色:发送者(Sender)、分布式队列(Queue)、接收者(Receiver)。发送者和接收者分别指的是生产消息和接收消息的应用程序或服务。
需要重点明确的概念是分布式队列,它是提供以下功能的应用程序或服务:1. 接收“发送者”产生的消息实体;2. 传输、暂存该实体;3. 为“接收者”提供读取该消息实体的功能。特定的场景下,它当然可以是Kafka、RabbitMQ等消息中间件。但它的展现形式并不限于此,例如:

  • 队列可以是一张数据库的表,发送者将消息写入表,接收者从数据表里读消息。
  • 如果一个程序把数据写入Redis等内存Cache里面,另一个程序从Cache里面读取,缓存在这里就是一种分布式队列。
  • 流式编程里面的的数据流传输也是一种队列。
  • 典型的MVC(Model–view–controller)设计模式里面,如果Model的变化需要导致View的变化,也可以通过队列进行传输。这里的分布式队列可以是数据库,也可以是某台服务器上的一块内存。

抽象模型

最基础的分布式队列编程抽象模型是点对点模型,其他抽象构架模型居于改基本模型上各角色的数量和交互变化所导致的不同拓扑图。具体而言,不同数量的发送者、分布式队列以及接收者组合形成了不同的分布式队列编程模型。记住并理解典型的抽象模型结构对需求分析和建模而言至关重要,同时也会有助于学习和深入理解开源框架以及别人的代码。

点对点模型(Point-to-point)

基础模型中,只有一个发送者、一个接收者和一个分布式队列。如下图所示:
点对点模型

生产者消费者模型(Producer–consumer)

如果发送者和接收者都可以有多个部署实例,甚至不同的类型;但是共用同一个队列,这就变成了标准的生产者消费者模型。在该模型,三个角色一般称之为生产者(Producer)、分布式队列(Queue)、消费者(Consumer)。
生产者消费者模型

发布订阅模型(PubSub)

如果只有一类发送者,发送者将产生的消息实体按照不同的主题(Topic)分发到不同的逻辑队列。每种主题队列对应于一类接收者。这就变成了典型的发布订阅模型。在该模型,三个角色一般称之为发布者(Publisher),分布式队列(Queue),订阅者(Subscriber)。
发布订阅模型

MVC模型

如果发送者和接收者存在于同一个实体中,但是共享一个分布式队列。这就很像经典的MVC模型。

MVC模型

编程模型

为了让读者更好地理解分布式队列编程模式概念,这里将其与一些容易混淆的概念做一些对比 。

分布式队列模型编程和异步编程

分布式队列编程模型的通讯机制一般是采用异步机制,但是它并不等同于异步编程。
首先,并非所有的异步编程都需要引入队列的概念,例如:大部分的操作系统异步I/O操作都是通过硬件中断( Hardware Interrupts)来实现的。
其次,异步编程并不一定需要跨进程,所以其应用场景并不一定是分布式环境。
最后,分布式队列编程模型强调发送者、接收者和分布式队列这三个角色共同组成的架构。这三种角色与异步编程没有太多关联。

分布式队列模式编程和流式编程

随着Spark Streaming,Apache Storm等流式框架的广泛应用,流式编程成了当前非常流行的编程模式。但是本文所阐述的分布式队列编程模型和流式编程并非同一概念。
首先,本文的队列编程模式不依赖于任何框架,而流式编程是在具体的流式框架内的编程。
其次,分布式队列编程模型是一个需求解决方案,关注如何根据实际需求进行分布式队列编程建模。流式框架里的数据流一般都通过队列传递,不过,流式编程的关注点比较聚焦,它关注如何从流式框架里获取消息流,进行map、reduce、 join等转型(Transformation)操作、生成新的数据流,最终进行汇总、统计。


分布式队列编程实战篇

这里所有的项目都是作者在新美大工作的真实案例。实战篇的关注点是训练建模思路,所以这些例子都按照挑战、构思、架构三个步骤进行讲解。受限于保密性要求,有些细节并未给出,但这些细节并不影响讲解的完整性。另一方面,特别具体的需求容易让人费解,为了使讲解更加顺畅,作者也会采用一些更通俗易懂的例子。通过本篇的讲解,希望和读者一起去实践“如何从需求出发去构架分布式队列编程模型”。

需要声明的是,这里的解决方案并不是所处场景的最优方案。但是,任何一个稍微复杂的问题,都没有最优解决方案,更谈不上唯一的解决方案。实际上,工程师每天所追寻的只是在满足一定约束条件下的可行方案。当然不同的约束会导致不同的方案,约束的松弛度决定了工程师的可选方案的宽广度。

信息采集处理

信息采集处理应用广泛,例如:广告计费、用户行为收集等。作者碰到的具体项目是为广告系统设计一套高可用的采集计费系统。
典型的广告CPC、CPM计费原理是:收集用户在客户端或者网页上的点击和浏览行为,按照点击和浏览进行计费。计费业务有如下典型特征:

  • 采集者和处理者解耦,采集发生在客户端,而计费发生在服务端。
  • 计费与钱息息相关。
  • 重复计费意味着灾难。
  • 计费是动态实时行为,需要接受预算约束,如果消耗超过预算,则广告投放需要停止。
  • 用户的浏览和点击量非常大。

挑战

计费业务的典型特征给我们带来了如下挑战:

  • 高吞吐量--广告的浏览和点击量非常巨大,我们需要设计一个高吞吐量的采集架构。
  • 高可用性--计费信息的丢失意味着直接的金钱损失。任何处理服务器的崩溃不应该导致系统不可用。
  • 高一致性要求--计费是一个实时动态处理过程,但要受到预算的约束。收集到的浏览和点击行为如果不能快速处理,可能会导致预算花超,或者点击率预估不准确。所以采集到的信息应该在最短的时间内传输到计费中心进行计费。
  • 完整性约束--这包括反作弊规则,单个用户行为不能重复计费等。这要求计费是一个集中行为而非分布式行为。
  • 持久化要求--计费信息需要持久化,避免因为机器崩溃而导致收集到的数据产生丢失。

构思

采集的高可用性意味着我们需要多台服务器同时采集,为了避免单IDC故障,采集服务器需要部署在多IDC里面。
实现一个高可用、高吞吐量、高一致性的信息传递系统显然是一个挑战,为了控制项目开发成本,采用开源的消息中间件进行消息传输就成了必然选择。
完整性约束要求集中进行计费,所以计费系统发生在核心IDC。
计费服务并不关心采集点在哪里,采集服务也并不关心谁进行计费。
根据以上构思,我们认为采集计费符合典型的“生产者消费者模型”。

架构

采集计费系统架构图如下:

  • 用户点击浏览收集服务(Click/View Collector)作为生产者部署在多个机房里,以提高收集服务可用性。
  • 每个机房里采集到的数据通过消息队列中间件发送到核心机房IDC_Master。
  • Billing服务作为消费者部署在核心机房集中计费。
    计费采集架构

采用此架构,我们可以在如下方面做进一步优化:

  • 提高可扩展性,如果一个Billing部署实例在性能上无法满足要求,可以对采集的数据进行主题分区(Topic Partition)计费,即采用发布订阅模式以提高可扩展性(Scalability)。
  • 全局排重和反作弊。采用集中计费架构解决了点击浏览排重的问题,另一方面,这也给反作弊提供了全局信息。
  • 提高计费系统的可用性。采用下文单例服务优化策略,在保障计费系统集中性的同时,提高计费系统可用性。

分布式缓存更新(Distributed Cache Replacement)

缓存是一个非常宽泛的概念,几乎存在于系统各个层级。典型的缓存访问流程如下:

  • 接收到请求后,先读取缓存,如果命中则返回结果。
  • 如果缓存不命中,读取DB或其它持久层服务,更新缓存并返回结果。
    缓存更新
    对于已经存入缓存的数据,其更新时机和更新频率是一个经典问题,即缓存更新机制(Cache Replacement Algorithms )。典型的缓存更新机制包括:近期最少使用算法(LRU)、最不经常使用算法(LFU)。这两种缓存更新机制的典型实现是:启动一个后台进程,定期清理最近没有使用的,或者在一段时间内最少使用的数据。由于存在缓存驱逐机制,当一个请求在没有命中缓存时,业务层需要从持久层中获取信息并更新缓存,提高一致性。

挑战

分布式缓存给缓存更新机制带来了新的问题:

  • 数据一致性低。分布式缓存中键值数量巨大,从而导致LRU或者LFU算法更新周期很长。在分布式缓存中,拿LRU算法举例,其典型做法是为每个Key值设置一个生存时间(TTL),生存时间到期后将该键值从缓存中驱逐除去。考虑到分布式缓存中庞大的键值数量,生存时间往往会设置的比较长,这就导致缓存和持久层数据不一致时间很长。如果生存时间设置过短,大量请求无法命中缓存被迫读取持久层,系统响应时间会急剧恶化。
  • 新数据不可用。在很多场景下,由于分布式缓存和持久层的访问性能相差太大,在缓存不命中的情况下,一些应用层服务不会尝试读取持久层,而直接返回空结果。漫长的缓存更新周期意味着新数据的可用性就被牺牲了。从统计的角度来讲,新键值需要等待半个更新周期才会可用。

构思

根据上面的分析,分布式缓存需要解决的问题是:在保证读取性能的前提下,尽可能地提高老数据的一致性和新数据的可用性。如果仍然假定最近被访问的键值最有可能被再次访问(这是LRU或者LFU成立的前提),键值每次被访问后触发一次异步更新就是提高可用性和一致性最早的时机。无论是高性能要求还是业务解耦都要求缓存读取和缓存更新分开,所以我们应该构建一个单独的集中的缓存更新服务。集中进行缓存更新的另外一个好处来自于频率控制。由于在一段时间内,很多类型访问键值的数量满足高斯分布,短时间内重复对同一个键值进行更新Cache并不会带来明显的好处,甚至造成缓存性能的下降。通过控制同一键值的更新频率可以大大缓解该问题,同时有利于提高整体数据的一致性,参见“排重优化”。

综上所述,业务访问方需要把请求键值快速传输给缓存更新方,它们之间不关心对方的业务。要快速、高性能地实现大量请求键值消息的传输,高性能分布式消息中间件就是一个可选项。这三方一起组成了一个典型的分布式队列编程模型。

架构

如下图,所有的业务请求方作为生产者,在返回业务代码处理之前将请求键值写入高性能队列。Cache Updater作为消费者从队列中读取请求键值,将持久层中数据更新到缓存中。
分布式缓存更新
采用此架构,我们可以在如下方面做进一步优化:

  • 提高可扩展性,如果一个Cache Updater在性能上无法满足要求,可以对键值进行主题分区(Topic Partition)进行并行缓存更新,即采用发布订阅模式以提高可扩展性(Scalability)。
  • 更新频率控制。缓存更新都集中处理,对于发布订阅模式,同一类主题(Topic)的键值集中处理。Cache Updater可以控制对同一键值的在短期内的更新频率(参见下文排重优化)。

后台任务处理

典型的后台任务处理应用包括工单处理、火车票预订系统、机票选座等。我们所面对的问题是为运营人员创建工单。一次可以为多个运营人员创建多个工单。这个应用场景和火车票购买非常类似。工单相对来说更加抽象,所以,下文会结合火车票购买和运营人员工单分配这两种场景同时讲解。典型的工单创建要经历两个阶段:数据筛选阶段、工单创建阶段。例如,在火车票预订场景,数据筛选阶段用户选择特定时间、特定类型的火车,而在工单创建阶段,用户下单购买火车票。

挑战

工单创建往往会面临如下挑战:

  • 数据一致性问题。以火车票预订为例,用户筛选火车票和最终购买之间往往有一定的时延,意味着两个操作之间数据是不一致的。在筛选阶段,工程师们需决定是否进行车票锁定,如果不锁定,则无法保证出票成功。反之,如果在筛选地时候锁定车票,则会大大降低系统效率和出票吞吐量。
  • 约束问题。工单创建需要满足很多约束,主要包含两种类型:动态约束,与操作者的操作行为有关,例如购买几张火车票的决定往往发生在筛选最后阶段。隐性约束,这种约束很难通过界面进行展示,例如一个用户购买了5张火车票,这些票应该是在同一个车厢的临近位置。
  • 优化问题。工单创建往往是约束下的优化,这是典型的统筹优化问题,而统筹优化往往需要比较长的时间。
  • 响应时间问题。对于多任务工单,一个请求意味着多个任务产生。这些任务的创建往往需要遵循事务性原则,即All or Nothing。在数据层面,这意味着工单之间需要满足串行化需求(Serializability)。大数据量的串行化往往意味着锁冲突延迟甚至失败。无论是延迟机制所导致的长时延,还是高创建失败率,都会大大伤害用户体验。

构思

如果将用户筛选的最终规则做为消息存储下来,并发送给工单创建系统。此时,工单创建系统将具备创建工单所需的全局信息,具备在满足各种约束的条件下进行统筹优化的能力。如果工单创建阶段采用单实例部署,就可以避免数据锁定问题,同时也意味着没有锁冲突,所以也不会有死锁或任务延迟问题。
居于以上思路,在多工单处理系统的模型中,筛选阶段的规则创建系统将充当生产者角色,工单创建系统将充当消费者角色,筛选规则将作为消息在两者之间进行传递。这就是典型的分布式队列编程架构。根据工单创建量的不同,可以采用数据库或开源的分布式消息中间件作为分布式队列。

架构

该架构流程如下图:

  • 用户首选进行规则创建,这个过程主要是一些搜索筛选操作;
  • 用户点击工单创建,TicketRule Generator将把所有的筛选性组装成规则消息并发送到队列里面去;
  • Ticket Generator作为一个消费者,实时从队列中读取工单创建请求,开始真正创建工单。
    工单创建
    采用该架构,我们在数据锁定、运筹优化、原子性问题都能得到比较好成果:
  • 数据锁定推迟到工单创建阶段,可以减少数据锁定范围,最大程度的降低工单创建对其他在线操作的影响范围。
  • 如果需要进行统筹优化,可以将Ticket Generator以单例模式进行部署(参见单例服务优化)。这样,Ticket Generator可以读取一段时间内的工单请求,进行全局优化。例如,在我们的项目中,在某种条件下,运营人员需要满足分级公平原则,即相同级别的运营人员的工单数量应该接近,不同级别的运营人员工单数量应该有所区分。如果不集中进行统筹优化,实现这种优化规则将会很困难。
  • 保障了约束完整性。例如,在我们的场景里面,每个运营人员每天能够处理的工单是有数量限制的,如果采用并行处理的方式,这种完整性约束将会很难实施。

预告

下周我们会推出优化篇,重点阐述在工程师在运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议,欢迎关注!

参考资料

[1] RabbitMQ, Highly Available Queues.
[2] IBM Knowledge Center, Introduction to message queuing.
[3] Wikipedia, Serializability.
[4] Hadoop, ZooKeeper Recipes and Solutions.
[5] Apache Kafka.
[6] Lamport L, Paxos Made Simple.

原文出自:https://tech.meituan.com/archives

分布式系统互斥性与幂等性问题的分析与解决

美团点评阅读(2785)

前言

随着互联网信息技术的飞速发展,数据量不断增大,业务逻辑也日趋复杂,对系统的高并发访问、海量数据处理的场景也越来越多。如何用较低成本实现系统的高可用、易伸缩、可扩展等目标就显得越发重要。为了解决这一系列问题,系统架构也在不断演进。传统的集中式系统已经逐渐无法满足要求,分布式系统被使用在更多的场景中。

分布式系统由独立的服务器通过网络松散耦合组成。在这个系统中每个服务器都是一台独立的主机,服务器之间通过内部网络连接。分布式系统有以下几个特点:

  • 可扩展性:可通过横向水平扩展提高系统的性能和吞吐量。
  • 高可靠性:高容错,即使系统中一台或几台故障,系统仍可提供服务。
  • 高并发性:各机器并行独立处理和计算。
  • 廉价高效:多台小型机而非单台高性能机。

然而,在分布式系统中,其环境的复杂度、网络的不确定性会造成诸如时钟不一致、“拜占庭将军问题”(Byzantine failure)等。存在于集中式系统中的机器宕机、消息丢失等问题也会在分布式环境中变得更加复杂。

基于分布式系统的这些特征,有两种问题逐渐成为了分布式环境中需要重点关注和解决的典型问题:

  • 互斥性问题。
  • 幂等性问题。

今天我们就针对这两个问题来进行分析。

互斥性问题

先看两个常见的例子:

例1:某服务记录关键数据X,当前值为100。A请求需要将X增加200;同时,B请求需要将X减100。
在理想的情况下,A先读取到X=100,然后X增加200,最后写入X=300。B请求接着从读取X=300,减少100,最后写入X=200。
然而在真实情况下,如果不做任何处理,则可能会出现:A和B同时读取到X=100;A写入之前B读取到X;B比A先写入等等情况。

例2:某服务提供一组任务,A请求随机从任务组中获取一个任务;B请求随机从任务组中获取一个任务。
在理想的情况下,A从任务组中挑选一个任务,任务组删除该任务,B从剩下的的任务中再挑一个,任务组删除该任务。
同样的,在真实情况下,如果不做任何处理,可能会出现A和B挑中了同一个任务的情况。

以上的两个例子,都存在操作互斥性的问题。互斥性问题用通俗的话来讲,就是对共享资源的抢占问题。如果不同的请求对同一个或者同一组资源读取并修改时,无法保证按序执行,无法保证一个操作的原子性,那么就很有可能会出现预期外的情况。因此操作的互斥性问题,也可以理解为一个需要保证时序性、原子性的问题。

在传统的基于数据库的架构中,对于数据的抢占问题往往是通过数据库事务(ACID)来保证的。在分布式环境中,出于对性能以及一致性敏感度的要求,使得分布式锁成为了一种比较常见而高效的解决方案。

事实上,操作互斥性问题也并非分布式环境所独有,在传统的多线程、多进程情况下已经有了很好的解决方案。因此在研究分布式锁之前,我们先来分析下这两种情况的解决方案,以期能够对分布式锁的解决方案提供一些实现思路。

多线程环境解决方案及原理

解决方案

《Thinking in Java》书中写到:

基本上所有的并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。

在多线程环境中,线程之间因为公用一些存储空间,冲突问题时有发生。解决冲突问题最普遍的方式就是用互斥锁把该资源或对该资源的操作保护起来。

Java JDK中提供了两种互斥锁Lock和synchronized。不同的线程之间对同一资源进行抢占,该资源通常表现为某个类的普通成员变量。因此,利用ReentrantLock或者synchronized将共享的变量及其操作锁住,即可基本解决资源抢占的问题。

下面来简单聊一聊两者的实现原理。

原理

ReentrantLock

ReentrantLock主要利用CAS+CLH队列来实现。它支持公平锁和非公平锁,两者的实现类似。

  • CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。
  • CLH队列:带头结点的双向非循环链表(如下图所示):

CLH队列

ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入CLH队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:

  • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
  • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

下面分析下两个片段:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在尝试获取锁的时候,会先调用上面的方法。如果状态为0,则表明此时无人占有锁。此时尝试进行set,一旦成功,则成功占有锁。如果状态不为0,再判断是否是当前线程获取到锁。如果是的话,将状态+1,因为此时就是当前线程,所以不用CAS。这也就是可重入锁的实现原理。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

该方法是在尝试获取锁失败加入CHL队尾之后,如果发现前序节点是head,则CAS再尝试获取一次。否则,则会根据前序节点的状态判断是否需要阻塞。如果需要阻塞,则调用LockSupport的park方法阻塞该线程。

synchronized

在Java语言中存在两种内建的synchronized语法:synchronized语句、synchronized方法。

  • synchronized语句:当源代码被编译成字节码的时候,会在同步块的入口位置和退出位置分别插入monitorenter和monitorexit字节码指令;
  • synchronized方法:在Class文件的方法表中将该方法的access_flags字段中的synchronized标志位置1。这个在specification中没有明确说明。

在Java虚拟机的specification中,有关于monitorenter和monitorexit字节码指令的详细描述:http://docs.oracle.com/Javase/specs/jvms/se7/html/jvms-6.html#jvms-6.5.monitorenter

monitorenter

The objectref must be of type reference.
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:

  • If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
  • If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
  • If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

每个对象都有一个锁,也就是监视器(monitor)。当monitor被占有时就表示它被锁定。线程执行monitorenter指令时尝试获取对象所对应的monitor的所有权,过程如下:

  • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者;
  • 如果线程已经拥有了该monitor,只是重新进入,则进入monitor的进入数加1;
  • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
monitorexit

The objectref must be of type reference.
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

执行monitorexit的线程必须是相应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个monitor的所有权。

在JDK1.6及其之前的版本中monitorenter和monitorexit字节码依赖于底层的操作系统的Mutex Lock来实现的,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。然而在现实中的大部分情况下,同步方法是运行在单线程环境(无锁竞争环境)。如果每次都调用Mutex Lock将严重的影响程序的性能。因此在JDK 1.6之后的版本中对锁的实现做了大量的优化,这些优化在很大程度上减少或避免了Mutex Lock的使用。

多进程的解决方案

在多道程序系统中存在许多进程,它们共享各种资源,然而有很多资源一次只能供一个进程使用,这便是临界资源。多进程中的临界资源大致上可以分为两类,一类是物理上的真实资源,如打印机;一类是硬盘或内存中的共享数据,如共享内存等。而进程内互斥访问临界资源的代码被称为临界区。

针对临界资源的互斥访问,JVM层面的锁就已经失去效力了。在多进程的情况下,主要还是利用操作系统层面的进程间通信原理来解决临界资源的抢占问题。比较常见的一种方法便是使用信号量(Semaphores)。

信号量在POSIX标准下有两种,分别为有名信号量和无名信号量。无名信号量通常保存在共享内存中,而有名信号量是与一个特定的文件名称相关联。信号量是一个整数变量,有计数信号量和二值信号量两种。对信号量的操作,主要是P操作(wait)和V操作(signal)。

  • P操作:先检查信号量的大小,若值大于零,则将信号量减1,同时进程获得共享资源的访问权限,继续执行;若小于或者等于零,则该进程被阻塞后,进入等待队列。
  • V操作:该操作将信号量的值加1,如果有进程阻塞着等待该信号量,那么其中一个进程将被唤醒。

举个例子,设信号量为1,当一个进程A在进入临界区之前,先进行P操作。发现值大于零,那么就将信号量减为0,进入临界区执行。此时,若另一个进程B也要进去临界区,进行P操作,发现信号量等于0,则会被阻塞。当进程A退出临界区时,会进行V操作,将信号量的值加1,并唤醒阻塞的进程B。此时B就可以进入临界区了。

这种方式,其实和多线程环境下的加解锁非常类似。因此用信号量处理临界资源抢占,也可以简单地理解为对临界区进行加锁。

通过上面的一些了解,我们可以概括出解决互斥性问题,即资源抢占的基本方式为:

对共享资源的操作前后(进入退出临界区)加解锁,保证不同线程或进程可以互斥有序的操作资源。

加解锁方式,有显式的加解锁,如ReentrantLock或信号量;也有隐式的加解锁,如synchronized。那么在分布式环境中,为了保证不同JVM不同主机间不会出现资源抢占,那么同样只要对临界区加解锁就可以了。

然而在多线程和多进程中,锁已经有比较完善的实现,直接使用即可。但是在分布式环境下,就需要我们自己来实现分布式锁。

分布式环境下的解决方案——分布式锁

首先,我们来看看分布式锁的基本条件。

分布式锁条件

基本条件

再回顾下多线程和多进程环境下的锁,可以发现锁的实现有很多共通之处,它们都需要满足一些最基本的条件:

  1. 需要有存储锁的空间,并且锁的空间是可以访问到的。
  2. 锁需要被唯一标识。
  3. 锁要有至少两种状态。

仔细分析这三个条件:

  • 存储空间

锁是一个抽象的概念,锁的实现,需要依存于一个可以存储锁的空间。在多线程中是内存,在多进程中是内存或者磁盘。更重要的是,这个空间是可以被访问到的。多线程中,不同的线程都可以访问到堆中的成员变量;在多进程中,不同的进程可以访问到共享内存中的数据或者存储在磁盘中的文件。但是在分布式环境中,不同的主机很难访问对方的内存或磁盘。这就需要一个都能访问到的外部空间来作为存储空间。

最普遍的外部存储空间就是数据库了,事实上也确实有基于数据库做分布式锁(行锁、version乐观锁),如quartz集群架构中就有所使用。除此以外,还有各式缓存如Redis、Tair、Memcached、Mongodb,当然还有专门的分布式协调服务Zookeeper,甚至是另一台主机。只要可以存储数据、锁在其中可以被多主机访问到,那就可以作为分布式锁的存储空间。

  • 唯一标识

不同的共享资源,必然需要用不同的锁进行保护,因此相应的锁必须有唯一的标识。在多线程环境中,锁可以是一个对象,那么对这个对象的引用便是这个唯一标识。多进程环境中,信号量在共享内存中也是由引用来作为唯一的标识。但是如果不在内存中,失去了对锁的引用,如何唯一标识它呢?上文提到的有名信号量,便是用硬盘中的文件名作为唯一标识。因此,在分布式环境中,只要给这个锁设定一个名称,并且保证这个名称是全局唯一的,那么就可以作为唯一标识。

  • 至少两种状态

为了给临界区加锁和解锁,需要存储两种不同的状态。如ReentrantLock中的status,0表示没有线程竞争,大于0表示有线程竞争;信号量大于0表示可以进入临界区,小于等于0则表示需要被阻塞。因此只要在分布式环境中,锁的状态有两种或以上:如有锁、没锁;存在、不存在等等,均可以实现。

有了这三个条件,基本就可以实现一个简单的分布式锁了。下面以数据库为例,实现一个简单的分布式锁:
数据库表,字段为锁的ID(唯一标识),锁的状态(0表示没有被锁,1表示被锁)。
伪代码为:

lock = mysql.get(id);
while(lock.status == 1) {
    sleep(100);
}
mysql.update(lock.status = 1);
doSomething();
mysql.update(lock.status = 0);

问题

以上的方式即可以实现一个粗糙的分布式锁,但是这样的实现,有没有什么问题呢?

  • 问题1:锁状态判断原子性无法保证
    从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。

  • 问题2:网络断开或主机宕机,锁状态无法清除
    假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。

  • 问题3:无法保证释放的是自己上锁的那把锁
    在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。

进阶条件

如果分布式锁的实现,还能再解决上面的三个问题,那么就可以算是一个相对完整的分布式锁了。然而,在实际的系统环境中,还会对分布式锁有更高级的要求。

  1. 可重入:线程中的可重入,指的是外层函数获得锁之后,内层也可以获得锁,ReentrantLock和synchronized都是可重入锁;衍生到分布式环境中,一般仍然指的是线程的可重入,在绝大多数分布式环境中,都要求分布式锁是可重入的。
  2. 惊群效应(Herd Effect):在分布式锁中,惊群效应指的是,在有多个请求等待获取锁的时候,一旦占有锁的线程释放之后,如果所有等待的方都同时被唤醒,尝试抢占锁。但是这样的情况会造成比较大的开销,那么在实现分布式锁的时候,应该尽量避免惊群效应的产生。
  3. 公平锁和非公平锁:不同的需求,可能需要不同的分布式锁。非公平锁普遍比公平锁开销小。但是业务需求如果必须要锁的竞争者按顺序获得锁,那么就需要实现公平锁。
  4. 阻塞锁和自旋锁:针对不同的使用场景,阻塞锁和自旋锁的效率也会有所不同。阻塞锁会有上下文切换,如果并发量比较高且临界区的操作耗时比较短,那么造成的性能开销就比较大了。但是如果临界区操作耗时比较长,一直保持自旋,也会对CPU造成更大的负荷。

保留以上所有问题和条件,我们接下来看一些比较典型的实现方案。

典型实现

ZooKeeper的实现

ZooKeeper(以下简称“ZK”)中有一种节点叫做顺序节点,假如我们在/lock/目录下创建3个节点,ZK集群会按照发起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。

ZK中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZK集群断开连接,则该节点自动被删除。EPHEMERAL_SEQUENTIAL为临时顺序节点。

根据ZK中节点是否存在,可以作为分布式锁的锁状态,以此来实现一个分布式锁,下面是分布式锁的基本逻辑:

  1. 客户端调用create()方法创建名为“/dlm-locks/lockname/lock-”的临时顺序节点。
  2. 客户端调用getChildren(“lockname”)方法来获取所有已经创建的子节点。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
  4. 如果创建的节点不是所有节点中需要最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可,不过也仍需要考虑删除节点失败等异常情况。

开源的基于ZK的Menagerie的源码就是一个典型的例子:https://github.com/sfines/menagerie

Menagerie中的lock首先实现了可重入锁,利用ThreadLocal存储进入的次数,每次加锁次数加1,每次解锁次数减1。如果判断出是当前线程持有锁,就不用走获取锁的流程。

通过tryAcquireDistributed方法尝试获取锁,循环判断前序节点是否存在,如果存在则监视该节点并且返回获取失败。如果前序节点不存在,则再判断更前一个节点。如果判断出自己是第一个节点,则返回获取成功。

为了在别的线程占有锁的时候阻塞,代码中使用JUC的condition来完成。如果获取尝试锁失败,则进入等待且放弃localLock,等待前序节点唤醒。而localLock是一个本地的公平锁,使得condition可以公平的进行唤醒,配合循环判断前序节点,实现了一个公平锁。

这种实现方式非常类似于ReentrantLock的CHL队列,而且zk的临时节点可以直接避免网络断开或主机宕机,锁状态无法清除的问题,顺序节点可以避免惊群效应。这些特性都使得利用ZK实现分布式锁成为了最普遍的方案之一。

Redis的实现

Redis的分布式缓存特性使其成为了分布式锁的一种基础实现。通过Redis中是否存在某个锁ID,则可以判断是否上锁。为了保证判断锁是否存在的原子性,保证只有一个线程获取同一把锁,Redis有SETNX(即SET if Not
eXists)和GETSET(先写新值,返回旧值,原子性操作,可以用于分辨是不是首次操作)操作。

为了防止主机宕机或网络断开之后的死锁,Redis没有ZK那种天然的实现方式,只能依赖设置超时时间来规避。

以下是一种比较普遍但不太完善的Redis分布式锁的实现步骤(与下图一一对应):

  1. 线程A发送SETNX lock.orderid 尝试获得锁,如果锁不存在,则set并获得锁。
  2. 如果锁存在,则再判断锁的值(时间戳)是否大于当前时间,如果没有超时,则等待一下再重试。
  3. 如果已经超时了,在用GETSET lock.{orderid} 来尝试获取锁,如果这时候拿到的时间戳仍旧超时,则说明已经获得锁了。
  4. 如果在此之前,另一个线程C快一步执行了上面的操作,那么A拿到的时间戳是个未超时的值,这时A没有如期获得锁,需要再次等待或重试。

redis分布式锁实现

该实现还有一个需要考虑的问题是全局时钟问题,由于生产环境主机时钟不能保证完全同步,对时间戳的判断也可能会产生误差。

以上是Redis的一种常见的实现方式,除此以外还可以用SETNX+EXPIRE来实现。Redisson是一个官方推荐的Redis客户端并且实现了很多分布式的功能。它的分布式锁就提供了一种更完善的解决方案,源码:https://github.com/mrniko/redisson

Tair的实现

Tair和Redis的实现类似,Tair客户端封装了一个expireLock的方法:通过锁状态和过期时间戳来共同判断锁是否存在,只有锁已经存在且没有过期的状态才判定为有锁状态。在有锁状态下,不能加锁,能通过大于或等于过期时间的时间戳进行解锁。

采用这样的方式,可以不用在Value中存储时间戳,并且保证了判断是否有锁的原子性。更值得注意的是,由于超时时间是由Tair判断,所以避免了不同主机时钟不一致的情况。

以上的几种分布式锁实现方式,都是比较常见且有些已经在生产环境中应用。随着应用环境越来越复杂,这些实现可能仍然会遇到一些挑战。

  • 强依赖于外部组件:分布式锁的实现都需要依赖于外部数据存储如ZK、Redis等等,因此一旦这些外部组件出现故障,那么分布式锁就不可用了。
  • 无法完全满足需求:不同分布式锁的实现,都有相应的特点,对于一些需求并不能很好的满足,如实现公平锁、给等待锁加超时时间等等。

基于以上问题,结合多种实现方式,我们开发了Cerberus(得名自希腊神话里守卫地狱的猛犬),致力于提供灵活可靠的分布式锁。

Cerberus分布式锁

Cerberus有以下几个特点。

特点一:一套接口多种引擎

Cerberus分布式锁使用了多种引擎实现方式(Tair、ZK、未来支持Redis),支持使用方自主选择所需的一种或多种引擎。这样可以结合引擎特点,选择符合实际业务需求和系统架构的方式。

Cerberus分布式锁将不同引擎的接口抽象为一套,屏蔽了不同引擎的实现细节。使得使用方可以专注于业务逻辑,也可以任意选择并切换引擎而不必更改任何的业务代码。

如果使用方选择了一种以上的引擎,那么以配置顺序来区分主副引擎。以下是使用主引擎的推荐:

功能需求TairZK
并发量高
响应时间敏感
临界区执行时间长
公平锁
非公平锁
读写锁

特点二:使用灵活、学习成本低

下面是Cerberus的lock方法,这些方法和JUC的ReentrantLock的方式保持一致,使用非常灵活且不需要额外的学习时间。

  • void lock();
    获取锁,如果锁被占用,将禁用当前线程,并且在获得锁之前,该线程将一直处于阻塞状态。
  • boolean tryLock();
    仅在调用时锁为空闲状态才获取该锁。
    如果锁可用,则获取锁,并立即返回值 true。如果锁不可用,则此方法将立即返回值 false。
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
    如果在给定时间内锁可用,则获取锁,并立即返回值 true。如果在给定时间内锁一直不可用,则此方法将立即返回值false。
  • void lockInterruptibly() throws InterruptedException;
    获取锁,如果锁被占用,则一直等待直到线程被中断或者获取到锁。
  • void unlock();
    释放当前持有的锁。

特点三:支持一键降级

Cerberus提供了实时切换引擎的接口:

  • String switchEngine()
    转换分布式锁引擎,按配置的引擎的顺序循环转换。
    返回值:返回当前的engine名字,如:”zk”。
  • String switchEngine(String engineName)
    转换分布式锁引擎,切换为指定的引擎。
    参数:engineName – 引擎的名字,同配置bean的名字,”zk”/”tair”。
    返回值:返回当前的engine名字,如:”zk”。

当使用方选择了两种引擎,平时分布式锁会工作在主引擎上。一旦所依赖的主引擎出现故障,那么使用方可以通过自动或者手动方式调用该切换引擎接口,平滑的将分布式锁切换到另一个引擎上以将风险降到最低。自动切换方式可以利用Hystrix实现。手动切换推荐的一个方案则是使用美团点评基于Zookeeper的基础组件MCC,通过监听MCC配置项更改,来达到手动将分布式系统所有主机同步切换引擎的目的。需要注意的是,切换引擎目前并不会迁移原引擎已有的锁。这样做的目的是出于必要性、系统复杂度和可靠性的综合考虑。在实际情况下,引擎故障到切换引擎,尤其是手动切换引擎的时间,要远大于分布式锁的存活时间。作为较轻量级的Cerberus来说,迁移锁会带来不必要的开销以及较高的系统复杂度。鉴于此,如果想要保证在引擎故障后的绝对可靠,那么则需要结合其他方案来进行处理。

除此以外,Cerberus还提供了内置公用集群,免去搭建和配置集群的烦恼。Cerberus也有一套完善的应用授权机制,以此防止业务方未经评估使用,对集群造成影响。

目前,Cerberus分布式锁已经持续迭代了8个版本,先后在美团点评多个项目中稳定运行。

幂等性问题

所谓幂等,简单地说,就是对接口的多次调用所产生的结果和调用一次是一致的。扩展一下,这里的接口,可以理解为对外发布的HTTP接口或者Thrift接口,也可以是接收消息的内部接口,甚至是一个内部方法或操作。

那么我们为什么需要接口具有幂等性呢?设想一下以下情形:

  • 在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题。
  • 在接收消息的时候,消息推送重复。如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大。

在分布式环境中,网络环境更加复杂,因前端操作抖动、网络故障、消息重复、响应速度慢等原因,对接口的重复调用概率会比集中式环境下更大,尤其是重复消息在分布式环境中很难避免。Tyler Treat也在《You Cannot Have Exactly-Once Delivery》一文中提到:

Within the context of a distributed system, you cannot have exactly-once message delivery.

分布式环境中,有些接口是天然保证幂等性的,如查询操作。有些对数据的修改是一个常量,并且无其他记录和操作,那也可以说是具有幂等性的。其他情况下,所有涉及对数据的修改、状态的变更就都有必要防止重复性操作的发生。通过间接的实现接口的幂等性来防止重复操作所带来的影响,成为了一种有效的解决方案。

GTIS

GTIS就是这样的一个解决方案。它是一个轻量的重复操作关卡系统,它能够确保在分布式环境中操作的唯一性。我们可以用它来间接保证每个操作的幂等性。它具有如下特点:

  • 高效:低延时,单个方法平均响应时间在2ms内,几乎不会对业务造成影响;
  • 可靠:提供降级策略,以应对外部存储引擎故障所造成的影响;提供应用鉴权,提供集群配置自定义,降低不同业务之间的干扰;
  • 简单:接入简捷方便,学习成本低。只需简单的配置,在代码中进行两个方法的调用即可完成所有的接入工作;
  • 灵活:提供多种接口参数、使用策略,以满足不同的业务需求。

实现原理

基本原理

GTIS的实现思路是将每一个不同的业务操作赋予其唯一性。这个唯一性是通过对不同操作所对应的唯一的内容特性生成一个唯一的全局ID来实现的。基本原则为:相同的操作生成相同的全局ID;不同的操作生成不同的全局ID。

生成的全局ID需要存储在外部存储引擎中,数据库、Redis亦或是Tair等等均可实现。考虑到Tair天生分布式和持久化的优势,目前的GTIS存储在Tair中。其相应的key和value如下:

  • key:将对于不同的业务,采用APP_KEY+业务操作内容特性生成一个唯一标识trans_contents。然后对唯一标识进行加密生成全局ID作为Key。
  • value:current_timestamp + trans_contents,current_timestamp用于标识当前的操作线程。

判断是否重复,主要利用Tair的SETNX方法,如果原来没有值则set且返回成功,如果已经有值则返回失败。

内部流程

GTIS的内部实现流程为:

  1. 业务方在业务操作之前,生成一个能够唯一标识该操作的transContents,传入GTIS;
  2. GTIS根据传入的transContents,用MD5生成全局ID;
  3. GTIS将全局ID作为key,current_timestamp+transContents作为value放入Tair进行setNx,将结果返回给业务方;
  4. 业务方根据返回结果确定能否开始进行业务操作;
  5. 若能,开始进行操作;若不能,则结束当前操作;
  6. 业务方将操作结果和请求结果传入GTIS,系统进行一次请求结果的检验;
  7. 若该次操作成功,GTIS根据key取出value值,跟传入的返回结果进行比对,如果两者相等,则将该全局ID的过期时间改为较长时间;
  8. GTIS返回最终结果。

实现难点

GTIS的实现难点在于如何保证其判断重复的可靠性。由于分布式环境的复杂度和业务操作的不确定性,在上一章节分布式锁的实现中考虑的网络断开或主机宕机等等问题,同样需要在GTIS中设法解决。这里列出几个典型的场景:

  • 如果操作执行失败,理想的情况应该是另一个相同的操作可以立即进行。因此,需要对业务方的操作结果进行判断,如果操作失败,那么就需要立即删除该全局ID;
  • 如果操作超时或主机宕机,当前的操作无法告知GTIS操作是否成功。那么我们必须引入超时机制,一旦长时间获取不到业务方的操作反馈,那么也需要该全局ID失效;
  • 结合上两个场景,既然全局ID会失效并且可能会被删除,那就需要保证删除的不是另一个相同操作的全局ID。这就需要将特殊的标识记录下来,并由此来判断。这里所用的标识为当前时间戳。

可以看到,解决这些问题的思路,也和上一章节中的实现有很多类似的地方。除此以外,还有更多的场景需要考虑和解决,所有分支流程如下:
GTIS原理

使用说明

使用时,业务方只需要在操作的前后调用GTIS的前置方法和后置方法,如下图所示。如果前置方法返回可进行操作,则说明此时无重复操作,可以进行。否则则直接结束操作。

redis分布式锁实现

使用方需要考虑的主要是下面两个参数:

  • 空间全局性:业务方输入的能够标志操作唯一性的内容特性,可以是唯一性的String类型的ID,也可以是map、POJO等形式。如订单ID等
  • 时间全局性:确定在多长时间内不允许重复,1小时内还是一个月内亦或是永久。

此外,GTIS还提供了不同的故障处理策略和重试机制,以此来降低外部存储引擎异常对系统造成的影响。

目前,GTIS已经持续迭代了7个版本,距离第一个版本有近1年之久,先后在美团点评多个项目中稳定运行。

结语

在分布式环境中,操作互斥性问题和幂等性问题非常普遍。经过分析,我们找出了解决这两个问题的基本思路和实现原理,给出了具体的解决方案。

针对操作互斥性问题,常见的做法便是通过分布式锁来处理对共享资源的抢占。分布式锁的实现,很大程度借鉴了多线程和多进程环境中的互斥锁的实现原理。只要满足一些存储方面的基本条件,并且能够解决如网络断开等异常情况,那么就可以实现一个分布式锁。目前已经有基于Zookeeper和Redis等存储引擎的比较典型的分布式锁实现。但是由于单存储引擎的局限,我们开发了基于ZooKeeper和Tair的多引擎分布式锁Cerberus,它具有使用灵活方便等诸多优点,还提供了完善的一键降级方案。

针对操作幂等性问题,我们可以通过防止重复操作来间接的实现接口的幂等性。GTIS提供了一套可靠的解决方法:依赖于存储引擎,通过对不同操作所对应的唯一的内容特性生成一个唯一的全局ID来防止操作重复。

目前Cerberus分布式锁、GTIS都已应用在生产环境并平稳运行。两者提供的解决方案已经能够解决大多数分布式环境中的操作互斥性和幂等性的问题。值得一提的是,分布式锁和GTIS都不是万能的,它们对外部存储系统的强依赖使得在环境不那么稳定的情况下,对可靠性会造成一定的影响。在并发量过高的情况下,如果不能很好的控制锁的粒度,那么使用分布式锁也是不太合适的。总的来说,分布式环境下的业务场景纷繁复杂,要解决互斥性和幂等性问题还需要结合当前系统架构、业务需求和未来演进综合考虑。Cerberus分布式锁和GTIS也会持续不断地迭代更新,提供更多的引擎选择、更高效可靠的实现方式、更简捷的接入流程,以期满足更复杂的使用场景和业务需求。

原文出自:https://tech.meituan.com/archives

Spark Streaming + Elasticsearch构建App异常监控平台

美团点评阅读(2817)

本文已发表在《程序员》杂志2016年10月期。

如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满。但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑。为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常。一旦发现严重问题,及时进行热修复,从而把损失降到最低。App异常监控平台,就是将这个方法服务化。

低成本

小型创业团队一般会选择第三方平台提供的异常监控服务。但中型以上规模的团队,往往会因为不想把核心数据共享给第三方平台,而选择独立开发。造轮子,首先要考虑的就是成本问题。我们选择了站在开源巨人的肩膀上,如图1所示。

 图1 数据流向示意图

Spark Streaming

每天来自客户端和服务器的大量异常信息,会源源不断的上报到异常平台的Kafka中,因此我们面临的是一个大规模流式数据处理问题。美团点评数据平台提供了Storm和Spark Streaming两种流式计算解决方案。我们主要考虑到团队之前在Spark批处理方面有较多积累,使用Spark Streaming成本较低,就选择了后者。

Elasticsearch

Elasticsearch(后文简称ES),是一个开源搜索引擎。不过在监控平台中,我们是当做“数据库”来使用的。为了降低展示层的接入成本,我们还使用了另一个开源项目ES SQL提供类SQL查询。ES的运维成本,相对 SQL on HBase方案也要低很多。整个项目开发只用了不到700行代码,开发维护成本还是非常低的。那如此“简单”的系统,可用性可以保证吗?

高可用

Spark Streaming + Kafka的组合,提供了“Exactly Once”保证:异常数据经过流式处理后,保证结果数据中(注:并不能保证处理过程中),每条异常最多出现一次,且最少出现一次。保证Exactly Once是实现24/7的高可用服务最困难的地方。在实际生产中会出现很多情况,对Exactly Once的保证提出挑战:

异常重启

Spark提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。这就保证了即使发生异常情况,也可以实现每条数据至少写一次HDFS。再覆写相同的HDFS文件就保证了Exactly Once(注:并不是所有业务场景都允许覆写)。写ES的结果也一样可以保证Exactly Once。你可以把ES的索引,就当成HDFS文件一样来用:新建、删除、移动、覆写。
作为一个24/7运行的程序,在实际生产中,异常是很常见的,需要有这样的容错机制。但是否遇到所有异常,都要立刻挂掉再重启呢?显然不是,甚至在一些场景下,你即使重启了,还是会继续挂掉。我们的解决思路是:尽可能把异常包住,让异常发生时,暂时不影响服务。

 图 2 作业异常重启架构图

如图2所示,包住异常,并不意味可以忽略它,必须把异常收集到Spark Driver端,接入监控(报警)系统,人工判断问题的严重性,确定修复的优先级。
为了更好地掌控Spark Streaming服务的状态,我们还单独开发了一个作业调度(重启)工具。美团点评数据平台安全认证的有效期是7天,一般离线的批处理作业很少会运行超过这个时间,但Spark Streaming作业就不同了,它需要一直保持运行,所以作业只要超过7天就会出现异常。因为没有找到优雅的解决方案,只好粗暴地利用调度工具,每周重启刷新安全认证,来保证服务的稳定。

升级重导

Spark提供了2种读取Kafka的模式:“Receiver-based Approach”和“Direct Approach”。使用Receiver模式,在极端情况下会出现Receiver OOM问题。
使用Direct模式可以避免这个问题。我们使用的就是这种Low-level模式,但在一些情况下需要我们自己维护Kafka Offset:
升级代码:开启Checkpoint后,如果想改动代码,需要清空之前的Checkpoint目录后再启动,否则改动可能不会生效。但当这样做了之后,就会发现另一个问题——程序“忘记”上次读到了哪个位置,因为存储在Checkpoint中的Offset信息也一同被清空了。这种情况下,需要自己用ZooKeeper维护Kafka的Offset。
重导数据:重导数据的场景也是,当希望从之前的某一个时间点开始重新开始计算的时候,显然也需要自己维护时间和Offset的映射关系。
自己维护Offset的成本并不高,所以看起来Checkpoint功能很鸡肋。其实可以有一些特殊用法的,例如,因为Python不需要编译,所以如果使用的是PySpark,可以把主要业务逻辑写在提交脚本的外边,再使用Import调用。这样升级主要业务逻辑代码时,只要重启一下程序即可。网上有不少团队分享过升级代码的“黑科技”,这里不再展开。
实现24/7监控服务,我们不仅要解决纯稳定性问题,还要解决延迟问题。

低延迟

App异常监控,需要保证数据延迟在分钟级。
虽然Spark Streaming有着强大的分布式计算能力,但要满足用户角度的低延迟,可不是单纯的能计算完这么简单。

输入问题

iOS App崩溃时,会生成Crash Log,但其内容是一堆十六进制的内存地址,对开发者来说就是“天书”。只有经过“符号化”的Crash Log,开发者才能看懂。因为符号化需要在Mac环境下进行,而我们的Mac集群资源有限,不能符号化全部Crash Log。即使做了去重等优化,符号化后的数据流还是有延迟。每条异常信息中,包含N维数据,如果不做符号化只能拿到其中的M维。

 图 3 双延迟乱序数据流融合示意图

如图3所示,我们将数据源分为符号化数据流、未符号化数据流,可以看出两个数据流的相对延迟时间T较稳定。如果直接使用符号化后的数据流,那么全部N维数据都会延迟时间T。为了降低用户角度的延迟,我们根据经验加大了时间窗口:先存储未符号化的M维数据,等到拿到对应的符号化数据后,再覆写全部N维数据,这样就只有N-M维数据延迟时间T了。

输出问题

如果Spark Streaming计算结果只是写入HDFS,很难遇到什么性能问题。但你如果想写入ES,问题就来了。因为ES的写入速度大概是每秒1万行,只靠增加Spark Streaming的计算能力,很难突破这个瓶颈。
异常数据源的特点是数据量的波峰波谷相差巨大。由于我们使用了 Direct 模式,不会因为数据量暴涨而挂掉,但这样的“稳定”从用户角度看没有任何意义:短时间内,数据延迟会越来越大,暴增后新出现的异常无法及时报出来。为了解决这个问题,我们制定了一套服务降级方案。

 服务降级方案示意图

如图4所示,我们根据写ES的实际瓶颈K,对每个周期处理的全部数据N使用水塘抽样(比例K/N),保证始终不超过瓶颈。并在空闲时刻使用Spark批处理,将N-K部分从HDFS补写到ES。既然写ES这么慢,那我们为什么还要用ES呢?

高性能

开发者需要在监控平台上分析异常。实际分析场景可以抽象描述为:“实时 秒级 明细 聚合” 数据查询。
我们团队在使用的OLAP解决方案可以分为4种,它们各有各的优势:

  • SQL on HBase方案,例如:Phoenix、Kylin。我们团队从2015年Q1开始,陆续在SEM、SEO生产环境中使用Phoenix、Kylin至今。Phoenix算是一个“全能选手”,但更适合业务模式较固定的场景;Kylin是一个很不错的OLAP产品,但它的问题是不能很好支持实时查询和明细查询,因为它需要离线预聚合。另外,基于其他NoSQL的方案,基本大同小异,如果选择HBase,建议团队在HBase运维方面有一定积累。
  • SQL on HDFS方案,例如:Presto、Spark SQL。这两个产品,因为只能做到亚秒级查询,我们平时多用在数据挖掘的场景中。
  • 时序数据库方案,例如:Druid、OpenTSDB。OpenTSDB是我们旧版App异常监控系统使用过的方案,更适合做系统指标监控。
  • 搜索引擎方案,代表项目有ES。相对上面的3种方案,基于倒排索引的ES非常适合异常分析的场景,可以满足:实时、秒级、明细、聚合,全部4种需求。

ES在实际使用中的表现如何呢?

明细查询

支持明显查询,算是ES的主要特色,但因为是基于倒排索引的,明细查询的结果最多只能取到10000条。在异常分析中,使用明细查询的场景,其实就是追查异常Case,根据条件返回前100条就能满足需求了。例如:已知某设备出现了Crash,直接搜索这个设备的DeviceId就可以看到这个设备最近的异常数据。我们在生产环境中做到了95%的明细查询场景1秒内返回。

聚合查询

面对爆炸的异常信息,一味追求全是不现实,也是没必要的。开发者需要能快速发现关键问题。
因此平台需要支持多维度聚合查询,例如按模块版本机型城市等分类聚合,如图5所示。

 图 5 聚合查询页面截图

不用做优化,ES聚合查询的性能就已经可以满足需求。因此,我们只做了一些小的使用改进,例如:很多异常数据在各个维度的值都是相同的,做预聚合可以提高一些场景下的查询速度。开发者更关心最近48小时发生的异常,分离冷热数据,自动清理历史数据也有助于提升性能。最终在生产环境中,做到了90%的聚合查询场景1秒内返回。

可扩展

异常平台不止要监控App Crash,还要监控服务端的异常、性能等。不同业务的数据维度是不同的,相同业务的数据维度也会不断的变化,如果每次新增业务或维度都需要修改代码,那整套系统的升级维护成本就会很高。

维度

为了增强平台的可扩展性,我们做了全平台联动的动态维度扩展:如果App开发人员在日志中新增了一个“城市”维度,那么他不需要联系监控平台做项目排期,立刻就可以在平台中查询“城市”维度的聚合数据。只需要制定好数据收集、数据处理、数据展示之间的交互协议,做到动态维度扩展就很轻松了。需要注意的是,ES中需要聚合的维度,Index要设置为“not_analyzed”。
想要支持动态字段扩展,还要使用动态模板,样例如下:

{
    "mappings": {
        "es_type_name": {
            "dynamic_templates": [
                {
                    "template_1": {
                        "match": "*log*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string"
                        }
                    }
                },
                {
                    "template_2": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            ]
        }
    }
}

资源

美团点评数据平台提供了Kafka、Spark、ES的集群,整套技术栈在资源上也是分布式可扩展的。
线上集群使用的版本:

  • kafka-0.8.2.0
  • spark-1.5.2
  • elasticsearch-2.1.1

原文出自:https://tech.meituan.com/archives

深度剖析开源分布式监控CAT

美团点评阅读(3105)

CAT(Central Application Tracking)是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,基本接入了美团点评上海侧所有核心应用。目前在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为美团点评各业务线提供系统的性能指标、健康状况、监控告警等。自2014年开源以来,除了美团点评之外,CAT还在携程、陆金所、猎聘网、找钢网等多家互联网公司生产环境应用,项目的开源地址是 http://github.com/dianping/cat

本文会对CAT整体设计、客户端、服务端等的一些设计思路做详细深入的介绍。

背景介绍

CAT整个产品研发是从2011年底开始的,当时正是大众点评从.NET迁移到Java的核心起步阶段。当初大众点评已经有核心的基础中间件、RPC组件Pigeon、统一配置组件Lion。整体Java迁移已经在服务化的路上。随着服务化的深入,整体Java在线上部署规模逐渐变多,同时,暴露的问题也越来越多。典型的问题有:

  • 大量报错,特别是核心服务,需要花很久时间才能定位。
  • 异常日志都需要线上权限登陆线上机器排查,排错时间长。
  • 有些简单的错误定位都非常困难(一次将线上的库配置到了Beta,花了整个通宵排错)。
  • 很多不了了之的问题怀疑是网络问题(从现在看,内网真的很少出问题)。

虽然那时候也有一些简单的监控工具(比如Zabbix,自己研发的Hawk系统等),可能单个工具在某方面的功能还不错,但整体服务化水平参差不齐、扩展能力相对较弱,监控工具间不能互通互联,使得查找问题根源基本都需要在多个系统之间切换,有时候真的是靠“人品”才能找出根源。

适逢在eBay工作长达十几年的吴其敏加入大众点评成为首席架构师,他对eBay内部应用非常成功的CAL系统有深刻的理解。就在这样天时地利人和的情况下,我们开始研发了大众点评第一代监控系统——CAT。

CAT的原型和理念来源于eBay的CAL系统,最初是吴其敏在大众点评工作期间设计开发的。他之前曾CAT不仅增强了CAL系统核心模型,还添加了更丰富的报表。

整体设计

监控整体要求就是快速发现故障、快速定位故障以及辅助进行程序性能优化。为了做到这些,我们对监控系统的一些非功能做了如下的要求:

  • 实时处理:信息的价值会随时间锐减,尤其是事故处理过程中。
  • 全量数据:最开始的设计目标就是全量采集,全量的好处有很多。
  • 高可用:所有应用都倒下了,需要监控还站着,并告诉工程师发生了什么,做到故障还原和问题定位。
  • 故障容忍:CAT本身故障不应该影响业务正常运转,CAT挂了,应用不该受影响,只是监控能力暂时减弱。
  • 高吞吐:要想还原真相,需要全方位地监控和度量,必须要有超强的处理吞吐能力。
  • 可扩展:支持分布式、跨IDC部署,横向扩展的监控系统。
  • 不保证可靠:允许消息丢失,这是一个很重要的trade-off,目前CAT服务端可以做到4个9的可靠性,可靠系统和不可靠性系统的设计差别非常大。

CAT从开发至今,一直秉承着简单的架构就是最好的架构原则,主要分为三个模块:CAT-client、CAT-consumer、CAT-home。

  • Cat-client 提供给业务以及中间层埋点的底层SDK。
  • Cat-consumer 用于实时分析从客户端提供的数据。
  • Cat-home 作为用户给用户提供展示的控制端。

在实际开发和部署中,Cat-consumer和Cat-home是部署在一个JVM内部,每个CAT服务端都可以作为consumer也可以作为home,这样既能减少整个层级结构,也可以增加系统稳定性。
image

上图是CAT目前多机房的整体结构图,图中可见:

  • 路由中心是根据应用所在机房信息来决定客户端上报的CAT服务端地址,目前美团点评有广州、北京、上海三地机房。
  • 每个机房内部都有独立的原始信息存储集群HDFS。
  • CAT-home可以部署在一个机房也可以部署在多个机房,在最后做展示的时候,home会从consumer中进行跨机房的调用,将所有的数据合并展示给用户。
  • 实际过程中,consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色。

客户端设计

客户端设计是CAT系统设计中最为核心的一个环节,客户端要求是做到API简单、高可靠性能,无论在任何场景下都不能影响客业务性能,监控只是公司核心业务流程一个旁路环节。CAT核心客户端是Java,也支持Net客户端,近期公司内部也在研发其他多语言客户端。以下客户端设计及细节均以Java客户端为模板。

设计架构

CAT客户端在收集端数据方面使用ThreadLocal(线程局部变量),是线程本地变量,也可以称之为线程本地存储。其实ThreadLocal的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,属于Java中一种较为特殊的线程绑定机制,每一个线程都可以独立地改变自己的副本,不会和其它线程的副本冲突。

在监控场景下,为用户提供服务都是Web容器,比如tomcat或者Jetty,后端的RPC服务端比如Dubbo或者Pigeon,也都是基于线程池来实现的。业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务、数据库、缓存等,将这些数据拿回来再进行业务逻辑封装,最后将结果展示给用户。所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适。

image

如上图所示,业务执行业务逻辑的时候,就会把此次请求对应的监控存放于线程上下文中,存于上下文的其实是一个监控树的结构。在最后业务线程执行结束时,将监控对象存入一个异步内存队列中,CAT有个消费线程将队列内的数据异步发送到服务端。

API设计

监控API定义往往取决于对监控或者性能分析这个领域的理解,监控和性能分析所针对的场景有如下几种:

  • 一段代码的执行时间,一段代码可以是URL执行耗时,也可以是SQL的执行耗时。
  • 一段代码的执行次数,比如Java抛出异常记录次数,或者一段逻辑的执行次数。
  • 定期执行某段代码,比如定期上报一些核心指标:JVM内存、GC等指标。
  • 关键的业务监控指标,比如监控订单数、交易额、支付成功率等。

在上述领域模型的基础上,CAT设计自己核心的几个监控对象:Transaction、Event、Heartbeat、Metric。

一段监控API的代码示例如下:
image

序列化和通信

序列化和通信是整个客户端包括服务端性能里面很关键的一个环节。

  • CAT序列化协议是自定义序列化协议,自定义序列化协议相比通用序列化协议要高效很多,这个在大规模数据实时处理场景下还是非常有必要的。
  • CAT通信是基于Netty来实现的NIO的数据传输,Netty是一个非常好的NIO开发框架,在这边就不详细介绍了。

客户端埋点

日志埋点是监控活动的最重要环节之一,日志质量决定着监控质量和效率。当前CAT的埋点目标是以问题为中心,像程序抛出exception就是典型问题。我个人对问题的定义是:不符合预期的就可以算问题,比如请求未完成、响应时间快了慢了、请求TPS多了少了、时间分布不均匀等等。

在互联网环境中,最突出的问题场景,突出的理解是:跨越边界的行为。包括但不限于:

  • HTTP/REST、RPC/SOA、MQ、Job、Cache、DAL;
  • 搜索/查询引擎、业务应用、外包系统、遗留系统;
  • 第三方网关/银行, 合作伙伴/供应商之间;
  • 各类业务指标,如用户登录、订单数、支付状态、销售额。

遇到的问题

通常Java客户端在业务上使用容易出问题的地方就是内存,另外一个是CPU。内存往往是内存泄露,占用内存较多导致业务方GC压力增大; CPU开销最终就是看代码的性能。

以前我们遇到过一个极端的例子,我们一个业务请求做餐饮加商铺的销售额,业务一般会通过for循环所有商铺的分店,结果就造成内存OOM了,后来发现这家店是肯德基,有几万分店,每个循环里面都会有数据库连接。在正常场景下,ThreadLocal内部的监控一个对象就存在几万个节点,导致业务Oldgc特别严重。所以说框架的代码是不能想象业务方会怎么用你的代码,需要考虑到任何情况下都有出问题的可能。

在消耗CPU方面我们也遇到一个case:在某个客户端版本,CAT本地存储当前消息ID自增的大小,客户端使用了MappedByteBuffer这个类,这个类是一个文件内存映射,测试下来这个类的性能非常高,我们仅仅用这个存储了几个字节的对象,正常情况理论上不会有任何问题。在一次线上场景下,很多业务线程都block在这个上面,结果发现当本身这台机器IO存在瓶颈时候,这个也会变得很慢。后来的优化就是把这个IO的操作异步化,所以客户端需要尽可能异步化,异步化序列化、异步化传输、异步化任何可能存在时间延迟的代码操作

服务端设计

服务端主要的问题是大数据的实时处理,目前后端CAT的计算集群大约35台物理机,存储集群大约35台物理机,每天处理了约100TB的数据量。线上单台机器高峰期大约是110MB/s,接近千兆网打满。

下面我重点讲下CAT服务端一些设计细节。

架构设计

在最初的整体介绍中已经画了架构图,这边介绍下单机的consumer中大概的结构如下:

image

如上图,CAT服务端在整个实时处理中,基本上实现了全异步化处理。

  • 消息接受是基于Netty的NIO实现。
  • 消息接受到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。
  • 消息存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

当某个报表处理器处理来不及时候,比如Transaction报表处理比较慢,可以通过配置支持开启多个Transaction处理线程,并发消费消息。

image

实时分析

CAT服务端实时报表分析是整个监控系统的核心,CAT重客户端采集的是是原始的logview,目前一天大约有1000亿的消息,这些原始的消息太多了,所以需要在这些消息基础上实现丰富报表,来支持业务问题及性能分析的需要。

CAT是根据日志消息的特点(比如只读特性)和问题场景,量身定做的,它将所有的报表按消息的创建时间,一小时为单位分片,那么每小时就产生一个报表。当前小时报表的所有计算都是基于内存的,用户每次请求即时报表得到的都是最新的实时结果。对于历史报表,因为它是不变的,所以实时不实时也就无所谓了。

CAT基本上所有的报表模型都可以增量计算,它可以分为:计数、计时和关系处理三种。计数又可以分为两类:算术计数和集合计数。典型的算术计数如:总个数(count)、总和(sum)、均值(avg)、最大/最小(max/min)、吞吐(tps)和标准差(std)等,其他都比较直观,标准差稍微复杂一点,大家自己可以推演一下怎么做增量计算。那集合运算,比如95线(表示95%请求的完成时间)、999线(表示99.9%请求的完成时间),则稍微复杂一些,系统开销也更大一点。

报表建模

CAT每个报表往往有多个维度,以transaction报表为例,它有5个维度,分别是应用、机器、Type、Name和分钟级分布情况。如果全维度建模,虽然灵活,但开销将会非常之大。CAT选择固定维度建模,可以理解成将这5个维度组织成深度为5的树,访问时总是从根开始,逐层往下进行。

CAT服务端为每个报表单独分配一个线程,所以不会有锁的问题,所有报表模型都是非线程安全的,其数据是可变的。这样带来的好处是简单且低开销。

CAT报表建模是使用自研的Maven Plugin自动生成的。所有报表是可合并和裁剪的,可以轻易地将2个或多个报表合并成一个报表。在报表处理代码中,CAT大量使用访问者模式(visitor pattern)。

性能分析报表

image

故障发现报表

  • 实时业务指标监控 :核心业务都会定义自己的业务指标,这不需要太多,主要用于24小时值班监控,实时发现业务指标问题,图中一个是当前的实际值,一个是基准值,就是根据历史趋势计算的预测值。如下图就是当时的情景,能直观看到支付业务出问题的故障。

    image

  • 系统报错大盘。

  • 实时数据库大盘、服务大盘、缓存大盘等。

存储设计

CAT系统的存储主要有两块:

  • CAT的报表的存储。
  • CAT原始logview的存储。

报表是根据logview实时运算出来的给业务分析用的报表,默认报表有小时模式、天模式、周模式以及月模式。CAT实时处理报表都是产生小时级别统计,小时级报表中会带有最低分钟级别粒度的统计。天、周、月等报表都是在小时级别报表合并的结果报表。

原始logview存储一天大约100TB的数据量,因为数据量比较大所以存储必须要要压缩,本身原始logview需要根据Message-ID读取,所以存储整体要求就是批量压缩以及随机读。在当时场景下,并没有特别合适成熟的系统以支持这样的特性,所以我们开发了一种基于文件的存储以支持CAT的场景,在存储上一直是最难的问题,我们一直在这块持续的改进和优化。

消息ID的设计

CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。

CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段:

  • 第一段是应用名shop-web。
  • 第二段是当前这台机器的IP的16进制格式,01010680表示10.1.6.108。
  • 第三段的375030,是系统当前时间除以小时得到的整点数。
  • 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。

存储数据的设计

消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团点评每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。

整体存储结构如下图:

image

CAT在写数据一份是Index文件,一份是Data文件.

  • Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
  • 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
  • 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
  • CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内便宜地址读取出真正的消息内容。

服务端设计总结

CAT在分布式实时方面,主要归结于以下几点因素:

  • 去中心化,数据分区处理。
  • 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。
  • 基于内存队列,全面异步化、单线程化、无锁设计。
  • 全局消息ID,数据本地化生产,集中式存储。
  • 组件化、服务化理念。

总结感悟

最后我们再花一点点时间来讲一下我们在实践里做的一些东西。

一、MVP版本,Demo版本用了1个月,MVP版本用了3个月。

为什么强调MVP版本?因为做这个项目需要老板和业务的支持。大概在2011年左右,我们整个生产环境估计也有一千台机器(虚拟机),一旦出现问题就到运维那边看日志,看日志的痛苦大家都应该理解,这时候发现一台机器核心服务出错,可能会导致更多的问题。我们就做了MVP版本解决这个问题,当时我们大概做了两个功能:一个是实时知道所有的API接口访问量成功率等;第二是实时能在CAT平台上看到异常日志。这里我想说的是MVP版本不要做太多内容,但是在做一个产品的时候必须从MVP版本做起,要做一些最典型特别亮眼的功能让大家支持你。

二、数据质量。数据质量是整个监控体系里面非常关键,它决定你最后的监控报表质量。所以我们要和跟数据库框架、缓存框架、RPC框架、Web框架等做深入的集成,让业务方便收集以及看到这些数据。

三、单机开发环境,这也是我们认为对整个项目开发效率提升最重要的一点。单机开发环境实际上就是说你在一台机器里可以把你所有的项目都启起来。如果你在一个单机环境下把所有东西启动起来,你就会想方设法地知道我依赖的服务挂了我怎么办?比如CAT依赖了HDFS。单机开发环境除了大幅度提高你的项目开发效率之外,还能提升你整个项目的可靠性。

四、最难的事情是项目上线推动。CAT整个项目大概有两三个人,当时白天都是支持业务上线,培训,晚上才能code,但是一旦随着产品和完善以及业务使用逐渐变多,一些好的产品后面会形成良性循环,推广就会变得比较容易。

五、开放生态。公司越大监控的需求越多,报表需求也更多,比如我们美团点评,产品有很多报表,整个技术体系里面也有很多报表非常多的自定义报表,很多业务方都提各自的需求。最后我们决定把整个CAT系统里面所有的数据都作为API暴露出去,这些需求并不是不能支持,而是这事情根本是做不完的。美团点评内部下游有很多系统依赖CAT的数据,来做进一步的报表展示。

CAT项目从2011年开始做,到现在整个生产环境大概有三千应用,监控的服务端从零到几千,再到今天的两万多的规模,整个项目是从历时看起来是一个五年多的项目,但即使是做了五年多的这样一个项目,目前还有很多的需求需要开发。这边也打个广告,我们团队急缺人,欢迎对监控系统研发有兴趣的同学加入,请联系yong.you@dianping.com.

原文出自:https://tech.meituan.com/archives

Storm 的可靠性保证测试

美团点评阅读(2780)

Storm 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。

在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。

在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。

Storm 的消息保证机制

Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

消息完全处理

每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
    .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
    .fieldsGrouping("split", new Fields("word"));

这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:

Tuple 树示例图

上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。

如何实现不同层次的消息保证机制

spout_bolt_acker

Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。

Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

  • 关闭 ACK 机制,即 Acker 数目设置为 0
  • Spout 不实现可靠性传输
    • Spout 发送消息是使用不带 message ID 的 API
    • 不实现 fail 函数
  • Bolt 不把处理成功或失败的消息发送给 Acker

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

  • 开启 ACK 机制,即 Acker 数目大于 0
  • Spout 实现可靠性传输保证
    • Spout 发送消息时附带 message 的 ID
    • 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
  • Bolt 在处理成功或失败后需要调用相应的方法通知 Acker

实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个 B ,下游收到两个 B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。

三种消息保证机制比较图

测试目的

Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:

  • 三种消息保证机制的表现,是否与官方的描述相符;
  • At Most Once 语义下,消息的丢失率和什么有关系、关系如何;
  • At Least Once 语义下,消息的重复率和什么有关系、关系如何。

测试环境

本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。

三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计,如下图所示:

测试流程示意图

测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。

测试场景

对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。

At Most Once

从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。

输入数据

保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。

测试结果

异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
050000050000000%0
01000000100000000%0
02000000200000000%0
03000000300000000%0
异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
1300000027749402250607.50%0
23000000230708769291323.09%0
33000000208282391717730.57%0
430000001420725157927552.64%0

结论

不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目异常次数正相关。与官方文档描述相符,符合预期。

At Least Once

为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。

测试数据

Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。

测试结果

Acker 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
010000010000002000(默认值)
020000020000002000
030000030000002000
040000040000002000
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
11000001000002200002000
21000001000002400102000
31000001000002600002000
41000001000002800002000

Spout 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
01000001000000
02000002000000
03000003000000
04000004000000
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000100000220520
2100000100000244140
4100000100000290080
6100000100000296900
316750

Bolt 发生异常的情况

调用 emit 函数之前发生异常

异常次数结果集中不重复的 Tuple 数数据重复的次数 (>1)出现重复的 Tuple 数数据丢失量
01000000
02000000
03000000
04000000
异常次数结果集中不重复的 Tuple 数数据重复的次数 (>1)出现重复的 Tuple 数数据丢失量
11000000
21000000
41000000
81000000
101000000

调用 emit 函数之后发生异常

异常次数结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
01000000
02000000
03000000
04000000
异常次数结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000220
2100000230
4100000250
8100000290
101000002110

结论

从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。

  • 不发生任何异常的情况下,消息不会重复不会丢失。
  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。
  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。
  • Bolt 发生异常的情况:
    • emit 之前发生异常,消息不会重复。
    • emit 之后发生异常,消息重复的次数等于异常的次数。

结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。

Exactly Once

对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。

测试数据

Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。

测试结果

Spout 发生异常情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Acker 发生异常的情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Bolt 发生异常的情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

结论

在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。

总结

对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。

不同消息可靠性保证的使用场景

对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:

可靠性保证层次优点缺点使用场景
At most once处理速度快数据可能丢失都处理速度要求高,且对数据丢失容忍度高的场景
At least once数据不会丢失数据可能重复不能容忍数据丢失,可以容忍数据重复的场景
Exactly once数据不会丢失,不会重复处理速度慢对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额

如何实现不同层次的消息可靠性保证

对于 At Least Once 的保证需要做如下几步:

  1. 需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;
  2. Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);
  3. Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。

不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。

不同层次的可靠性保证如何实现

如何实现可靠的 Spout

实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:

/**
     * 想实现可靠的 Spout,需要实现如下两点
     * 1. 在 nextTuple 函数中调用 emit 函数时需要带一个     msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
     * 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
     */
public void nextTuple() {
    //设置 msgId 和 Value 一样,方便 fail 之后重发
    collector.emit(new Values(curNum + "", round +     ""), curNum + ":" + round);
}

@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
String[] args = tmp.split(":");

//消息进行重发
collector.emit(new Values(args[0], args[1]), msgId);
}

如何实现可靠的 Bolt

Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:

//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(new Values(word));  // 不建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(tuple, new Values(word));  // 建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

Trident

在 Trident 中,Spout 和 State 分别有三种状态,如下图所示:

Trident Spout 和 State 的状态图

其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义,No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例:

     OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);   //Opaque Spout
    //TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);   //Transaction Spout

    TridentTopology topology = new TridentTopology();
    String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
    Stream stream = topology.newStream(spoutTxid, spout)
            .name("new stream")
            .parallelismHint(1);

    // kafka config
    KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();      //KafkaProducerConfig 仅对 kafka 相关配置进行了封装,具体可以参考 TridentKafkaStateFactory2(Map<String, String> config)
    Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
    TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper();  //TridentToKafkaMapper 继承自 TridentTupleToKafkaMapper<String, String>,实现 getMessageFromTuple 接口,该接口中返回 tridentTuple.getString(0);

    String  dstTopic = "test__topic_for_all";

    TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);
    stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
    stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));

    stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //从spout 出来数据是一个 bytes 类型的数据,第二个是参数是自己的处理函数,第三个参数是处理函数的输出字段
            .name("write2kafka")
            .partitionPersist(stateFactory         //将数据写入到 Kafka 中,可以保证写入到 Kafka 的数据是 exactly once 的
                    , new Fields("word")
                    , new TridentKafkaUpdater())
            .parallelismHint(1);

原文出自:https://tech.meituan.com/archives

分布式会话跟踪系统架构设计与实践

美团点评阅读(2805)

本文整理自美团点评技术沙龙第08期:大规模集群的服务治理设计与实践。

美团点评技术沙龙由美团点评技术团队主办,每月一期。每期沙龙邀请美团点评及其它互联网公司的技术专家分享来自一线的实践经验,覆盖各主要技术领域。

目前沙龙会分别在北京、上海和厦门等地举行,要参加下一次最新沙龙活动?赶快关注微信公众号“美团点评技术团队”。

这期沙龙主要内容有:分布式服务通信框架及服务治理系统、分布式监控系统实践、分布式会话跟踪系统架构设计与实践,特邀美恰CTO讲解时下热门话题“微服务”。其中既包括关键系统设计、在美团点评内部的实践经验,也包括一些项目在业界开源的运营实践。

前言

随着美团点评的业务发展,公司的分布式系统变得越来越复杂,我们亟需一个工具能够梳理内部服务之间的关系,感知上下游服务的形态。比如一次请求的流量从哪个服务而来、最终落到了哪个服务中去?服务之间是RPC调用,还是HTTP调用?一次分布式请求中的瓶颈节点是哪一个,等等。

简介

MTrace,美团点评内部的分布式会话跟踪系统,其核心理念就是调用链:通过一个全局的ID将分布在各个服务节点上的同一次请求串联起来,还原原有的调用关系、追踪系统问题、分析调用数据、统计系统指标。这套系统借鉴了2010年Google发表的一篇论文《dapper》,并参考了Twitter的Zipkin以及阿里的Eagle Eye的实现。
那么我们先来看一下什么是调用链,调用链其实就是将一次分布式请求还原成调用链路。显式的在后端查看一次分布式请求的调用情况,比如各个节点上的耗时、请求具体打到了哪台机器上、每个服务节点的请求状态,等等。它能反映出一次请求中经历了多少个服务以及服务层级等信息(比如你的系统A调用B,B调用C,那么这次请求的层级就是3),如果你发现有些请求层级大于10,那这个服务很有可能需要优化了。

网络优化

如上图所示,红框内显示了一次分布式请求经过各个服务节点的具体IP,通过该IP就可以查询一次分布式请求是否有跨机房调用等信息,优化调用链路的网络结构。

瓶颈查询

再比如上图,红框部分显示的是系统调用的瓶颈节点,由于该节点的耗时,导致了整个系统调用的耗时延长,因此该节点需要进行优化,进而优化整个系统的效率。这种问题通过调用链路能很快发现下游服务的瓶颈节点;但是假如没有这样的系统,我们会怎样做呢?首先我会发现下游服务超时造成了我的服务超时,这时我会去找这个下游服务的负责人,然后该负责人发现也不是他自己服务的问题,而是他们调用了其他人的接口造成的问题,紧接着他又去找下游的服务负责人。我们都知道跨部门之间的沟通成本很高的,这么找下去会花费大量的不必要时间,而有了MTrace之后,你只需要点开链路就能发现超时问题的瓶颈所在。

优化链路

我们再来看下上面这张图,红框部分都是同一个接口的调用,一次请求调用相同的接口10几次甚至是几十次,这是我们不想看到的事情,那么整个系统能不能对这样的请求进行优化,比如改成批量接口或者提高整个系统调用的并行度?在美团点评内部我们会针对这样的链路进行筛选分析,然后提供给业务方进行优化。

异常log绑定

通过MTrace不仅能做上述这些事情,通过它的特性,还能携带很多业务感兴趣的数据。因为MTrace可以做到数据和一次请求的绑定以及数据在一次请求的网络中传递。比如一些关键的异常log,一般服务的异常log很有可能是因为上游或者下游的异常造成的,那就需要我们手动地对各个不同服务的异常log做mapping。看这次的异常log对应到上游服务的哪个log上,是不是因为上游传递的一些参数造成了该次异常?而通过MTrace就可以将请求的参数、异常log等信息通过traceId进行绑定,很容易地就把这些信息聚合到了一起,方便业务端查询问题。

透明传输数据

业务端往往有这样的需求,它希望一些参数能在一次分布式请求一直传递下去,并且可以在不同的RPC中间件间传递。MTrace对该类需求提供了两个接口:

put(map<String, String> data)
putOnce(map<String, String> data)
  • put 接口:参数可以在一次分布式请求中一直传递。
  • putOnce 接口:参数在一次分布式请求中只传递一级。

如下图所示

  • 左侧绿色部分是put接口,service中调用了put接口传递了uid=123456这个参数,它会在网络中一直传递,可以在服务A中通过get(“uid”)的方式获取参数值,也可以在服务C中通过get(“uid”)的方式获取参数值。
  • 右侧蓝色部分是putOnce接口,service中调用了putOnce接口传递pid=11111,它只会传递一级,可以在服务B中通过get(“pid”)的方式获取参数值,但是在服务D中就获取不到pid的值了。

以上的两种接口可以用于业务自定义传递数据,比如通过传递一个服务标识,用于AB test,下游的所有服务获取到test的标识就会走test的策略,即上游服务可以传递一些参数,控制所有下游服务的逻辑。当然业务也可以通过该接口传递一些临时性的数据。

系统架构

主要分为三层:数据埋点上报、数据收集计算、数据前端展示。

基本概念

traceId

全局唯一,64位整数,用于标识一次分布式请求,会在RPC调用的网络中传递。

spanId

签名方式生成:0, 0.1, 0.1.1, 0.2。用于标识一次RPC在分布式请求中的位置,比如0.2就是0节点服务调用的第二个服务。

annotation

业务端自定义埋点,业务感兴趣的想上传到后端的数据,比如该次请求的用户ID等。

数据埋点

埋点SDK

提供统一的SDK,在各个中间件中埋点,生成traceID等核心数据,上报服务的调用数据信息。

  • 生成调用上下文;
  • 同步调用上下文存放在ThreadLocal, 异步调用通过显式调用API的方式支持;
  • 网络中传输关键埋点数据,用于中间件间的数据传递,支持Thrift, HTTP协议。

业内有些系统是使用注解的方式实现的埋点,这种方式看似很优雅,但是需要业务方显式依赖一些AOP库,这部分很容易出现问题,因为AOP方式太过透明,导致查问题很麻烦,而且业务方配置的东西越多越容易引起一些意想不到的问题,所以我们的经验是尽量在各个统一的中间件中进行显式埋点,虽然会导致代码间耦合度增加,但是方便后续定位问题。其次,为了整个框架的统一,MTrace并非仅支持Java一种语言,而AOP的特性很多语言是不支持的。

Agent

  • 透传数据,用作数据转发;
  • 做流量控制;
  • 控制反转,很多策略可以通过agent实现,而不需要每次都升级业务代码中的SDK。

Agent仅仅会转发数据,由Agent判断将数据转发到哪里,这样就可以通过Agent做数据路由、流量控制等操作。也正是由于Agent的存在,使得我们可以在Agent层实现一些功能,而不需要业务端做SDK的升级,要知道业务端SDK升级的过程是很缓慢的,这对于整个调用链的系统来说是不可接受的,因为MTrace整个系统是针对庞大的分布式系统而言的,有一环的服务缺失也会造成一定的问题。

目前MTrace支持的中间件有:

  • 公司内部RPC中间件
  • http中间件
  • mysql中间件
  • tair中间件
  • mq中间件

数据埋点的四个阶段:

  • Client Send : 客户端发起请求时埋点,需要传递一些参数,比如服务的方法名等

      Span span = Tracer.clientSend(param);
    
  • Server Recieve : 服务端接收请求时埋点,需要回填一些参数,比如traceId,spanId

      Tracer.serverRecv(param);
    
  • ServerSend : 服务端返回请求时埋点,这时会将上下文数据传递到异步上传队列中

      Tracer.serverSend();
    
  • Client Recieve : 客户端接收返回结果时埋点,这时会将上下文数据传递到异步上传队列中

      Tracer.clientRecv();
    

埋点上下文

上图CS、SR为创建上下文的位置,CR、SS为归档上下文的位置。

上下文归档

上下文归档,会把上下文数据异步上传到后端,为了减轻对业务端的影响,上下文上报采用的是异步队列的方式,数据不会落地,直接通过网络形式传递到后端服务,在传递之前会对数据做一层压缩,主要是压缩比很可观,可以达到10倍以上,所以就算牺牲一点CPU资源也是值得的。具体上报的数据如图所示:

我们之前在数据埋点时遇到了一些问题:

  • 异步调用
    • 异步IO造成的线程切换,不能通过ThreadLocal传递上下文。
    • 显式的通过API进行埋点传递,切换前保存,切换后还原。
    • 提供封装好的ThreadPool库。
  • 数据量大,每天千亿级别的数据
    • 批量上报
    • 数据压缩
    • 极端情况下采样

数据存储

Kafka使用

我们在SDK与后端服务之间加了一层Kafka,这样做既可以实现两边工程的解耦,又可以实现数据的延迟消费。我们不希望因为瞬时QPS过高而引起的数据丢失,当然为此也付出了一些实效性上的代价。

实时数据Hbase

调用链路数据的实时查询主要是通过Hbase,使用traceID作为RowKey,能天然的把一整条调用链聚合在一起,提高查询效率。

离线数据Hive

离线数据主要是使用Hive,可以通过SQL进行一些结构化数据的定制分析。比如链路的离线形态,服务的出度入度(有多少服务调用了该服务,该服务又调用了多少下游服务)

前端展示

前端展示,主要遇到的问题是NTP同步的问题,因为调用链的数据是从不同机器上收集上来的,那么聚合展示的时候就会有NTP时间戳不同步的问题,这个问题很难解决,于是我们采取的方式是前端做一层适配,通过SpanId定位调用的位置而不是时间,比如0.2一定是发生在0.1这个Span之后的调用,所以如果时间出现漂移,就会根据SpanId做一次校正。即判断时间顺序的优先级为最高是spanid,然后是时间戳。

总结

核心概念:调用链;
用途:定位系统瓶颈,优化系统结构、统计系统指标、分析系统数据;
架构:埋点上报、收集计算、展示分析。

分布式会话跟踪系统主要的特点就是能关联服务之间的联动关系,通过这层关系可以延伸出来很多有意义的分析数据,统计数据。为优化系统结构,查询系统瓶颈问题带来了极大的便利。

原文出自:https://tech.meituan.com/archives

美团数据仓库-数据脱敏

美团点评阅读(2940)

背景与目标

在数据仓库建设过程中,数据安全扮演着重要角色,因为隐私或敏感数据的泄露,会对数据主体(客户,员工和公司)的财产、名誉、人身安全、以及合法利益造成严重损害。因此我们需要严格控制对仓库中的数据访问,即什么样的人员或者需求才可以访问到相关的数据。这就要求对数据本身的敏感程度进行安全级别划分。数据有了安全等级的划分,才能更好管理对数据访问控制,以此来保护好数据安全。
举个例子简单的说明下,例如我们仓库中有一张关于注册用户的基本信息表User,其中有手机号mobile,昵称username两个字段。我们在划分数据安全层级的时,将用户mobile的安全等级划分为L2要高于username的等级L1,并规定只有访问权限达到L2的运营部门才能访问mobile字段。这样在公司各个部门需要访问注册用户基本信息表User时,我们只需检查访问者是否来自运营部门,如果是运营部可以访问mobile,如果不是只能访问username信息了。这样就有效的防止用户手机号被不相关工作人员泄露出去,同时也不影响查询用户username的需求。
但是往往在实际生产过程中,应用场景会更加复杂,仅靠类似这样的访问控制,满足不了生产的需要,还需要结合其它的途径,而数据脱敏就是一种有效的方式,既能满足日常生产的需要,又能保护数据安全。
数据脱敏,具体指对某些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数据的可靠保护。这样可以使数据本身的安全等级降级,就可以在开发、测试和其它非生产环境以及外包或云计算环境中安全地使用脱敏后的真实数据集。借助数据脱敏技术,屏蔽敏感信息,并使屏蔽的信息保留其原始数据格式和属性,以确保应用程序可在使用脱敏数据的开发与测试过程中正常运行。

敏感数据梳理

在数据脱敏进行之前,我们首先要确定哪些数据要作为脱敏的目标。我们根据美团特有的业务场景和数据安全级别划分(绝密、高保密、保密、可公开,四个级别), 主要从“高保密”等级的敏感数据,开始进行梳理。
这里我们把敏感数据分成四个维度进行梳理,用户、商家、终端、公司。

  1. 从用户维度进行梳理可能有这些敏感字段如下:手机号码、邮件地址、账号、地址、固定电话号码等信息(此外个人隐私数据相关还有如:种族、政治观点、宗教信仰、基因等)
  2. 从商家维度进行梳理:合同签订人,合同签订人电话等(不排除全局敏感数据:如商家团购品类等)
  3. 从用户终端维度进行梳理:能够可能标识终端的唯一性字段,如设备id。
  4. 从公司角度进行梳理:交易金额、代金卷密码、充值码等

确定脱敏处理方法

梳理出了敏感数据字段,我们接下来的工作就是如何根据特定的应用场景对敏感字段实施具体的脱敏处理方法。
常见的处理方法如下几种有:

  1. 替换:如统一将女性用户名替换为F,这种方法更像“障眼法”,对内部人员可以完全保持信息完整性,但易破解。
  2. 重排:序号12345重排为54321,按照一定的顺序进行打乱,很像“替换”, 可以在需要时方便还原信息,但同样易破解。
  3. 加密:编号12345加密为23456,安全程度取决于采用哪种加密算法,一般根据实际情况而定。
  4. 截断:13811001111截断为138,舍弃必要信息来保证数据的模糊性,是比较常用的脱敏方法,但往往对生产不够友好。
  5. 掩码: 123456 -> 1xxxx6,保留了部分信息,并且保证了信息的长度不变性,对信息持有者更易辨别, 如火车票上得身份信息。
  6. 日期偏移取整:20130520 12:30:45 -> 20130520 12:00:00,舍弃精度来保证原始数据的安全性,一般此种方法可以保护数据的时间分布密度。

但不管哪种手段都要基于不同的应用场景,遵循下面两个原则:
1.remain meaningful for application logic(尽可能的为脱敏后的应用,保留脱敏前的有意义信息)
2.sufficiently treated to avoid reverse engineer(最大程度上防止黑客进行破解)
以这次脱敏一个需求为例:
美团一般的业务场景是这样的,用户在网站上付款一笔团购单之后,我们会将团购密码,发到用户对应的手机号上。这个过程中,从用户的角度来看团购密码在未被用户消费之前,对用户来说是要保密的,不能被公开的,其次美团用户的手机号也是要保密的,因为公开之后可能被推送一些垃圾信息,或者更严重的危害。从公司内部数据分析人员来看,他们有时虽然没有权限知道用户团购密码,但是他们想分析公司发送的团购密码数量情况,这是安全允许;再有数据分析人员虽然没有权限知道用户具体的手机号码,但是他们需要统计美团用户手机的地区分布情况,或者运营商分布差异,进而为更上层的决策提供支持。
根据这样的需求,我们可以对团购密码做加密处理保证其唯一性,也保留其原有的数据格式,在保密的同时不影响数据分析的需求。同样,我们将用户的手机号码的前7位,关于运营商和地区位置信息保留,后四位进行模糊化处理。这样同样也达到了保护和不影响统计的需求。

因此从实际出发遵循上面的两个处理原则,第一阶段我们在脱敏工具集中,确定了如下4种基本类型的脱敏方案(对应4个udf):

字段名称 脱敏方法 举例 脱敏原则
电话号码(moblie)掩码13812345678-> 13812340000防止号码泄露,但保留运营商和地区信息 (唯一性,由前端绑定或者注册时约束)
邮件(email)截断+ 加密hxs@163.com -> 6225888e3a1d4a139f5f5db98d846102b2cd0d@163.com保留邮件域信息
团购密码(code)加密4023926843399219 -> 1298078978加密后在一定精度上保持唯一性,并与数据类型一致
设备号(deviceid)加密ffbacff42826302d9e832b7e907a212a -> b9c2a61972a19bf21b06b0ddb8ba642d加密后保持唯一性

确定实施范围与步骤

通过上面字段的梳理和脱敏方案的制定,我们对美团数据仓库中涉及到得敏感字段的表进行脱敏处理。在数据仓库分层理论中,数据脱敏往往发生在上层,最直接的是在对外开放这一层面上。在实际应用中,我们既要参考分层理论,又要从美团现有数据仓库生产环境的体系出发,主要在数据维度层(dim),以及基础服务数据层(fact)上实施脱敏。这样,我们可以在下游相关数据报表以及衍生数据层的开发过程中使用脱敏后的数据,从而避免出现数据安全问题。
确认处理的表和字段后,我们还要确保相关上下游流程的正常运行, 以及未脱敏的敏感信息的正常产出与存储(通过更严格的安全审核来进行访问)。
以用户信息表user为例,脱敏步骤如下:
1.首先生产一份ndm_user未脱敏数据,用于未脱敏数据的正常产出。
2.对下游涉及的所有依赖user生产流程进行修改,来确保脱敏后的正常运行,这里主要是确认数据格式,以及数据源的工作。
3.根据对应的脱敏方法对user表中对应的字段进行脱敏处理。

总结

通过上面的几个步骤的实施,我们完成了第一阶段的数据脱敏工作。在数据脱敏方案设计与实施过程中, 我们觉得更重要的还是从特定的应用场景出发进行整体设计,兼顾了数据仓库建设这一重要考量维度。数据脱敏实施为公司数据安全的推进,提供了有力支持。当然,我们第一阶段脱敏的工具集还相对较少,需要补充。 脱敏的技术架构还有待完善和更加自动化。
本文关于数据安全和数据访问隔离的控制阐述较少,希望通过以后的生产实践,继续为大家介绍。

参考

参考文献如下:

  1. http://en.wikipedia.org/wiki/Data_masking
  2. http://www.prnews.cn/press_release/51034.htm

原文出自:https://tech.meituan.com/archives

CentOS6上Hadoop集群中服务器cpu sys态异常的定位与解决

美团点评阅读(2824)

问题现象

  1. 在zabbix系统中,对Hadoop集群的历史监控数据分析时,发现在执行大Job任务时,某些服务节点的cpu sys态很高;
  2. 具体以hadoop_A服务节点为例,在10:15-10:40这个时间段,cpu user态为60%,而sys态则高达35%;
  3. 对于整个Hadoop集群,并不是所有的节点都会出现sys过高的问题,产生此类问题的都是部署CentOS6系统的节点。

定位分析

  1. 根据zabbix系统中cpu sys很高的问题发生时间,找到触发问题的大Job,以便于后面的问题重现和问题验证;
  2. 对问题节点hadoop_A的硬件信息和OS系统日志/var/log/messages进行初步检查,并未发现异常;
  3. 重启Job,重现问题。并使用nmon工具对问题节点hadoop_A的资源负载进行粗粒度的实时监测;
    hadoop_A节点上某一时刻瞬时的负载状态如下图:
    Alt text
  4. 通过上图,注意到网络流量达到了119.7MB/s,接收和发送的峰值都超过了120MB/s,初步怀疑网口在某一时间成为瓶颈,导致内核的sys过高。对hadoop_A的网口计数器细化分析,系统在uptime了83天的状态下,网口计数器中除overruns指标达22万之外,其他的网络指标正常。 这说明网络确实曾达到过峰值,也丢过包,但频率非常低,sys过高的问题应该不是网络负载过高触发。
    ifconfig查询网口的计数器状态如下图:
    Alt text
  5. 需要对系统进行更细粒度的分析,找出系统sys态消耗在什么地方。在hadoop_A节点上部署perf工具,通过perf top对kernel事件采样,实时分析内核事件。
    perf top在某一时刻的状态图如下:
    Alt text
    通过perf top监控可以断定:kernel中存在频繁的spin_lock_irqsave内核系统调用, sys态消耗过高应该与此有关。
  6. 重启Job,再次重现问题,并利用perf工具对内核函数的调用关系采样:
    perf record -a -g -F 1000 sleep 30
    采样结束后,在当前目录上会生成一个perf.data文件,使用perf工具查看函数调用关系:
    perf report -g
    perf report查看到的调用关系如下图:
    Alt text
  7. 通过调用依赖关系分析,spin_lock_irqsave主要called by compaction_alloc,初步推测问题由kernel的内存管理部分触发。联想到centos 6相对于centos 5在kernel内存管理模块的一些改进点(如transparent huge page, 基于numa的内存分配等),有没有可能是CentOS6新增的THP特性导致cpu sys过高?再在google上搜一把相关函数名的关键字,印证这个猜测。

问题验证

  1. 选择在节点hadoop_A上面做验证测试,通过以下内核参数优化关闭系统THP特性:
     echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
     echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag
    
  2. 重启触发问题的大Job,在hadoop_A节点未出现cpu sys 状态过高的现象。
  3. 在生产系统上运行24小时后,通过zabbix系统观察,其他内核未优化节点如hadoop_B,hadoop_C等节点依然存在cpu sys态过高的问题,而关闭了THP特性的hadoop_A节点没有出现cpu sys态过高的问题,验证了之前的分析。
    hadoop_B和hadoop_C依然存在cpu sys态过高的问题:
    Alt text
    hadoop_A cpu sys态正常:
    Alt text

结论

将Hadoop集群中所有CentOS6类型节点的THP特性关闭掉(在CentOS6中,THP特性默认都是打开的),关闭方法如下:

    echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
    echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag

值得注意的是,需要在puppet系统中部署该项优化,以免节点重启导致修改丢失。

参考

事后,在redhat官网和cloudera官网也搜到了相关的内容,附录下来,供参考。

  1. 在redhat的官网上,有对THP特性的细化说明:
    https://access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-transhuge.html
  2. 在cloudera的CDH4部署说明中,也建议将系统的THP的compaction特性关闭:
    http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.2/CDH4-Installation-Guide/cdh4ig_topic_11_6.html

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交