若谷学院
互联网公司技术架构分享

Spark Streaming + Elasticsearch构建App异常监控平台

美团点评阅读(18)

本文已发表在《程序员》杂志2016年10月期。

如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满。但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑。为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常。一旦发现严重问题,及时进行热修复,从而把损失降到最低。App异常监控平台,就是将这个方法服务化。

低成本

小型创业团队一般会选择第三方平台提供的异常监控服务。但中型以上规模的团队,往往会因为不想把核心数据共享给第三方平台,而选择独立开发。造轮子,首先要考虑的就是成本问题。我们选择了站在开源巨人的肩膀上,如图1所示。

 图1 数据流向示意图

Spark Streaming

每天来自客户端和服务器的大量异常信息,会源源不断的上报到异常平台的Kafka中,因此我们面临的是一个大规模流式数据处理问题。美团点评数据平台提供了Storm和Spark Streaming两种流式计算解决方案。我们主要考虑到团队之前在Spark批处理方面有较多积累,使用Spark Streaming成本较低,就选择了后者。

Elasticsearch

Elasticsearch(后文简称ES),是一个开源搜索引擎。不过在监控平台中,我们是当做“数据库”来使用的。为了降低展示层的接入成本,我们还使用了另一个开源项目ES SQL提供类SQL查询。ES的运维成本,相对 SQL on HBase方案也要低很多。整个项目开发只用了不到700行代码,开发维护成本还是非常低的。那如此“简单”的系统,可用性可以保证吗?

高可用

Spark Streaming + Kafka的组合,提供了“Exactly Once”保证:异常数据经过流式处理后,保证结果数据中(注:并不能保证处理过程中),每条异常最多出现一次,且最少出现一次。保证Exactly Once是实现24/7的高可用服务最困难的地方。在实际生产中会出现很多情况,对Exactly Once的保证提出挑战:

异常重启

Spark提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。这就保证了即使发生异常情况,也可以实现每条数据至少写一次HDFS。再覆写相同的HDFS文件就保证了Exactly Once(注:并不是所有业务场景都允许覆写)。写ES的结果也一样可以保证Exactly Once。你可以把ES的索引,就当成HDFS文件一样来用:新建、删除、移动、覆写。
作为一个24/7运行的程序,在实际生产中,异常是很常见的,需要有这样的容错机制。但是否遇到所有异常,都要立刻挂掉再重启呢?显然不是,甚至在一些场景下,你即使重启了,还是会继续挂掉。我们的解决思路是:尽可能把异常包住,让异常发生时,暂时不影响服务。

 图 2 作业异常重启架构图

如图2所示,包住异常,并不意味可以忽略它,必须把异常收集到Spark Driver端,接入监控(报警)系统,人工判断问题的严重性,确定修复的优先级。
为了更好地掌控Spark Streaming服务的状态,我们还单独开发了一个作业调度(重启)工具。美团点评数据平台安全认证的有效期是7天,一般离线的批处理作业很少会运行超过这个时间,但Spark Streaming作业就不同了,它需要一直保持运行,所以作业只要超过7天就会出现异常。因为没有找到优雅的解决方案,只好粗暴地利用调度工具,每周重启刷新安全认证,来保证服务的稳定。

升级重导

Spark提供了2种读取Kafka的模式:“Receiver-based Approach”和“Direct Approach”。使用Receiver模式,在极端情况下会出现Receiver OOM问题。
使用Direct模式可以避免这个问题。我们使用的就是这种Low-level模式,但在一些情况下需要我们自己维护Kafka Offset:
升级代码:开启Checkpoint后,如果想改动代码,需要清空之前的Checkpoint目录后再启动,否则改动可能不会生效。但当这样做了之后,就会发现另一个问题——程序“忘记”上次读到了哪个位置,因为存储在Checkpoint中的Offset信息也一同被清空了。这种情况下,需要自己用ZooKeeper维护Kafka的Offset。
重导数据:重导数据的场景也是,当希望从之前的某一个时间点开始重新开始计算的时候,显然也需要自己维护时间和Offset的映射关系。
自己维护Offset的成本并不高,所以看起来Checkpoint功能很鸡肋。其实可以有一些特殊用法的,例如,因为Python不需要编译,所以如果使用的是PySpark,可以把主要业务逻辑写在提交脚本的外边,再使用Import调用。这样升级主要业务逻辑代码时,只要重启一下程序即可。网上有不少团队分享过升级代码的“黑科技”,这里不再展开。
实现24/7监控服务,我们不仅要解决纯稳定性问题,还要解决延迟问题。

低延迟

App异常监控,需要保证数据延迟在分钟级。
虽然Spark Streaming有着强大的分布式计算能力,但要满足用户角度的低延迟,可不是单纯的能计算完这么简单。

输入问题

iOS App崩溃时,会生成Crash Log,但其内容是一堆十六进制的内存地址,对开发者来说就是“天书”。只有经过“符号化”的Crash Log,开发者才能看懂。因为符号化需要在Mac环境下进行,而我们的Mac集群资源有限,不能符号化全部Crash Log。即使做了去重等优化,符号化后的数据流还是有延迟。每条异常信息中,包含N维数据,如果不做符号化只能拿到其中的M维。

 图 3 双延迟乱序数据流融合示意图

如图3所示,我们将数据源分为符号化数据流、未符号化数据流,可以看出两个数据流的相对延迟时间T较稳定。如果直接使用符号化后的数据流,那么全部N维数据都会延迟时间T。为了降低用户角度的延迟,我们根据经验加大了时间窗口:先存储未符号化的M维数据,等到拿到对应的符号化数据后,再覆写全部N维数据,这样就只有N-M维数据延迟时间T了。

输出问题

如果Spark Streaming计算结果只是写入HDFS,很难遇到什么性能问题。但你如果想写入ES,问题就来了。因为ES的写入速度大概是每秒1万行,只靠增加Spark Streaming的计算能力,很难突破这个瓶颈。
异常数据源的特点是数据量的波峰波谷相差巨大。由于我们使用了 Direct 模式,不会因为数据量暴涨而挂掉,但这样的“稳定”从用户角度看没有任何意义:短时间内,数据延迟会越来越大,暴增后新出现的异常无法及时报出来。为了解决这个问题,我们制定了一套服务降级方案。

 服务降级方案示意图

如图4所示,我们根据写ES的实际瓶颈K,对每个周期处理的全部数据N使用水塘抽样(比例K/N),保证始终不超过瓶颈。并在空闲时刻使用Spark批处理,将N-K部分从HDFS补写到ES。既然写ES这么慢,那我们为什么还要用ES呢?

高性能

开发者需要在监控平台上分析异常。实际分析场景可以抽象描述为:“实时 秒级 明细 聚合” 数据查询。
我们团队在使用的OLAP解决方案可以分为4种,它们各有各的优势:

  • SQL on HBase方案,例如:Phoenix、Kylin。我们团队从2015年Q1开始,陆续在SEM、SEO生产环境中使用Phoenix、Kylin至今。Phoenix算是一个“全能选手”,但更适合业务模式较固定的场景;Kylin是一个很不错的OLAP产品,但它的问题是不能很好支持实时查询和明细查询,因为它需要离线预聚合。另外,基于其他NoSQL的方案,基本大同小异,如果选择HBase,建议团队在HBase运维方面有一定积累。
  • SQL on HDFS方案,例如:Presto、Spark SQL。这两个产品,因为只能做到亚秒级查询,我们平时多用在数据挖掘的场景中。
  • 时序数据库方案,例如:Druid、OpenTSDB。OpenTSDB是我们旧版App异常监控系统使用过的方案,更适合做系统指标监控。
  • 搜索引擎方案,代表项目有ES。相对上面的3种方案,基于倒排索引的ES非常适合异常分析的场景,可以满足:实时、秒级、明细、聚合,全部4种需求。

ES在实际使用中的表现如何呢?

明细查询

支持明显查询,算是ES的主要特色,但因为是基于倒排索引的,明细查询的结果最多只能取到10000条。在异常分析中,使用明细查询的场景,其实就是追查异常Case,根据条件返回前100条就能满足需求了。例如:已知某设备出现了Crash,直接搜索这个设备的DeviceId就可以看到这个设备最近的异常数据。我们在生产环境中做到了95%的明细查询场景1秒内返回。

聚合查询

面对爆炸的异常信息,一味追求全是不现实,也是没必要的。开发者需要能快速发现关键问题。
因此平台需要支持多维度聚合查询,例如按模块版本机型城市等分类聚合,如图5所示。

 图 5 聚合查询页面截图

不用做优化,ES聚合查询的性能就已经可以满足需求。因此,我们只做了一些小的使用改进,例如:很多异常数据在各个维度的值都是相同的,做预聚合可以提高一些场景下的查询速度。开发者更关心最近48小时发生的异常,分离冷热数据,自动清理历史数据也有助于提升性能。最终在生产环境中,做到了90%的聚合查询场景1秒内返回。

可扩展

异常平台不止要监控App Crash,还要监控服务端的异常、性能等。不同业务的数据维度是不同的,相同业务的数据维度也会不断的变化,如果每次新增业务或维度都需要修改代码,那整套系统的升级维护成本就会很高。

维度

为了增强平台的可扩展性,我们做了全平台联动的动态维度扩展:如果App开发人员在日志中新增了一个“城市”维度,那么他不需要联系监控平台做项目排期,立刻就可以在平台中查询“城市”维度的聚合数据。只需要制定好数据收集、数据处理、数据展示之间的交互协议,做到动态维度扩展就很轻松了。需要注意的是,ES中需要聚合的维度,Index要设置为“not_analyzed”。
想要支持动态字段扩展,还要使用动态模板,样例如下:

{
    "mappings": {
        "es_type_name": {
            "dynamic_templates": [
                {
                    "template_1": {
                        "match": "*log*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string"
                        }
                    }
                },
                {
                    "template_2": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            ]
        }
    }
}

资源

美团点评数据平台提供了Kafka、Spark、ES的集群,整套技术栈在资源上也是分布式可扩展的。
线上集群使用的版本:

  • kafka-0.8.2.0
  • spark-1.5.2
  • elasticsearch-2.1.1

原文出自:https://tech.meituan.com/archives

深度剖析开源分布式监控CAT

美团点评阅读(26)

CAT(Central Application Tracking)是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,基本接入了美团点评上海侧所有核心应用。目前在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为美团点评各业务线提供系统的性能指标、健康状况、监控告警等。自2014年开源以来,除了美团点评之外,CAT还在携程、陆金所、猎聘网、找钢网等多家互联网公司生产环境应用,项目的开源地址是 http://github.com/dianping/cat

本文会对CAT整体设计、客户端、服务端等的一些设计思路做详细深入的介绍。

背景介绍

CAT整个产品研发是从2011年底开始的,当时正是大众点评从.NET迁移到Java的核心起步阶段。当初大众点评已经有核心的基础中间件、RPC组件Pigeon、统一配置组件Lion。整体Java迁移已经在服务化的路上。随着服务化的深入,整体Java在线上部署规模逐渐变多,同时,暴露的问题也越来越多。典型的问题有:

  • 大量报错,特别是核心服务,需要花很久时间才能定位。
  • 异常日志都需要线上权限登陆线上机器排查,排错时间长。
  • 有些简单的错误定位都非常困难(一次将线上的库配置到了Beta,花了整个通宵排错)。
  • 很多不了了之的问题怀疑是网络问题(从现在看,内网真的很少出问题)。

虽然那时候也有一些简单的监控工具(比如Zabbix,自己研发的Hawk系统等),可能单个工具在某方面的功能还不错,但整体服务化水平参差不齐、扩展能力相对较弱,监控工具间不能互通互联,使得查找问题根源基本都需要在多个系统之间切换,有时候真的是靠“人品”才能找出根源。

适逢在eBay工作长达十几年的吴其敏加入大众点评成为首席架构师,他对eBay内部应用非常成功的CAL系统有深刻的理解。就在这样天时地利人和的情况下,我们开始研发了大众点评第一代监控系统——CAT。

CAT的原型和理念来源于eBay的CAL系统,最初是吴其敏在大众点评工作期间设计开发的。他之前曾CAT不仅增强了CAL系统核心模型,还添加了更丰富的报表。

整体设计

监控整体要求就是快速发现故障、快速定位故障以及辅助进行程序性能优化。为了做到这些,我们对监控系统的一些非功能做了如下的要求:

  • 实时处理:信息的价值会随时间锐减,尤其是事故处理过程中。
  • 全量数据:最开始的设计目标就是全量采集,全量的好处有很多。
  • 高可用:所有应用都倒下了,需要监控还站着,并告诉工程师发生了什么,做到故障还原和问题定位。
  • 故障容忍:CAT本身故障不应该影响业务正常运转,CAT挂了,应用不该受影响,只是监控能力暂时减弱。
  • 高吞吐:要想还原真相,需要全方位地监控和度量,必须要有超强的处理吞吐能力。
  • 可扩展:支持分布式、跨IDC部署,横向扩展的监控系统。
  • 不保证可靠:允许消息丢失,这是一个很重要的trade-off,目前CAT服务端可以做到4个9的可靠性,可靠系统和不可靠性系统的设计差别非常大。

CAT从开发至今,一直秉承着简单的架构就是最好的架构原则,主要分为三个模块:CAT-client、CAT-consumer、CAT-home。

  • Cat-client 提供给业务以及中间层埋点的底层SDK。
  • Cat-consumer 用于实时分析从客户端提供的数据。
  • Cat-home 作为用户给用户提供展示的控制端。

在实际开发和部署中,Cat-consumer和Cat-home是部署在一个JVM内部,每个CAT服务端都可以作为consumer也可以作为home,这样既能减少整个层级结构,也可以增加系统稳定性。
image

上图是CAT目前多机房的整体结构图,图中可见:

  • 路由中心是根据应用所在机房信息来决定客户端上报的CAT服务端地址,目前美团点评有广州、北京、上海三地机房。
  • 每个机房内部都有独立的原始信息存储集群HDFS。
  • CAT-home可以部署在一个机房也可以部署在多个机房,在最后做展示的时候,home会从consumer中进行跨机房的调用,将所有的数据合并展示给用户。
  • 实际过程中,consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色。

客户端设计

客户端设计是CAT系统设计中最为核心的一个环节,客户端要求是做到API简单、高可靠性能,无论在任何场景下都不能影响客业务性能,监控只是公司核心业务流程一个旁路环节。CAT核心客户端是Java,也支持Net客户端,近期公司内部也在研发其他多语言客户端。以下客户端设计及细节均以Java客户端为模板。

设计架构

CAT客户端在收集端数据方面使用ThreadLocal(线程局部变量),是线程本地变量,也可以称之为线程本地存储。其实ThreadLocal的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,属于Java中一种较为特殊的线程绑定机制,每一个线程都可以独立地改变自己的副本,不会和其它线程的副本冲突。

在监控场景下,为用户提供服务都是Web容器,比如tomcat或者Jetty,后端的RPC服务端比如Dubbo或者Pigeon,也都是基于线程池来实现的。业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务、数据库、缓存等,将这些数据拿回来再进行业务逻辑封装,最后将结果展示给用户。所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适。

image

如上图所示,业务执行业务逻辑的时候,就会把此次请求对应的监控存放于线程上下文中,存于上下文的其实是一个监控树的结构。在最后业务线程执行结束时,将监控对象存入一个异步内存队列中,CAT有个消费线程将队列内的数据异步发送到服务端。

API设计

监控API定义往往取决于对监控或者性能分析这个领域的理解,监控和性能分析所针对的场景有如下几种:

  • 一段代码的执行时间,一段代码可以是URL执行耗时,也可以是SQL的执行耗时。
  • 一段代码的执行次数,比如Java抛出异常记录次数,或者一段逻辑的执行次数。
  • 定期执行某段代码,比如定期上报一些核心指标:JVM内存、GC等指标。
  • 关键的业务监控指标,比如监控订单数、交易额、支付成功率等。

在上述领域模型的基础上,CAT设计自己核心的几个监控对象:Transaction、Event、Heartbeat、Metric。

一段监控API的代码示例如下:
image

序列化和通信

序列化和通信是整个客户端包括服务端性能里面很关键的一个环节。

  • CAT序列化协议是自定义序列化协议,自定义序列化协议相比通用序列化协议要高效很多,这个在大规模数据实时处理场景下还是非常有必要的。
  • CAT通信是基于Netty来实现的NIO的数据传输,Netty是一个非常好的NIO开发框架,在这边就不详细介绍了。

客户端埋点

日志埋点是监控活动的最重要环节之一,日志质量决定着监控质量和效率。当前CAT的埋点目标是以问题为中心,像程序抛出exception就是典型问题。我个人对问题的定义是:不符合预期的就可以算问题,比如请求未完成、响应时间快了慢了、请求TPS多了少了、时间分布不均匀等等。

在互联网环境中,最突出的问题场景,突出的理解是:跨越边界的行为。包括但不限于:

  • HTTP/REST、RPC/SOA、MQ、Job、Cache、DAL;
  • 搜索/查询引擎、业务应用、外包系统、遗留系统;
  • 第三方网关/银行, 合作伙伴/供应商之间;
  • 各类业务指标,如用户登录、订单数、支付状态、销售额。

遇到的问题

通常Java客户端在业务上使用容易出问题的地方就是内存,另外一个是CPU。内存往往是内存泄露,占用内存较多导致业务方GC压力增大; CPU开销最终就是看代码的性能。

以前我们遇到过一个极端的例子,我们一个业务请求做餐饮加商铺的销售额,业务一般会通过for循环所有商铺的分店,结果就造成内存OOM了,后来发现这家店是肯德基,有几万分店,每个循环里面都会有数据库连接。在正常场景下,ThreadLocal内部的监控一个对象就存在几万个节点,导致业务Oldgc特别严重。所以说框架的代码是不能想象业务方会怎么用你的代码,需要考虑到任何情况下都有出问题的可能。

在消耗CPU方面我们也遇到一个case:在某个客户端版本,CAT本地存储当前消息ID自增的大小,客户端使用了MappedByteBuffer这个类,这个类是一个文件内存映射,测试下来这个类的性能非常高,我们仅仅用这个存储了几个字节的对象,正常情况理论上不会有任何问题。在一次线上场景下,很多业务线程都block在这个上面,结果发现当本身这台机器IO存在瓶颈时候,这个也会变得很慢。后来的优化就是把这个IO的操作异步化,所以客户端需要尽可能异步化,异步化序列化、异步化传输、异步化任何可能存在时间延迟的代码操作

服务端设计

服务端主要的问题是大数据的实时处理,目前后端CAT的计算集群大约35台物理机,存储集群大约35台物理机,每天处理了约100TB的数据量。线上单台机器高峰期大约是110MB/s,接近千兆网打满。

下面我重点讲下CAT服务端一些设计细节。

架构设计

在最初的整体介绍中已经画了架构图,这边介绍下单机的consumer中大概的结构如下:

image

如上图,CAT服务端在整个实时处理中,基本上实现了全异步化处理。

  • 消息接受是基于Netty的NIO实现。
  • 消息接受到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。
  • 消息存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

当某个报表处理器处理来不及时候,比如Transaction报表处理比较慢,可以通过配置支持开启多个Transaction处理线程,并发消费消息。

image

实时分析

CAT服务端实时报表分析是整个监控系统的核心,CAT重客户端采集的是是原始的logview,目前一天大约有1000亿的消息,这些原始的消息太多了,所以需要在这些消息基础上实现丰富报表,来支持业务问题及性能分析的需要。

CAT是根据日志消息的特点(比如只读特性)和问题场景,量身定做的,它将所有的报表按消息的创建时间,一小时为单位分片,那么每小时就产生一个报表。当前小时报表的所有计算都是基于内存的,用户每次请求即时报表得到的都是最新的实时结果。对于历史报表,因为它是不变的,所以实时不实时也就无所谓了。

CAT基本上所有的报表模型都可以增量计算,它可以分为:计数、计时和关系处理三种。计数又可以分为两类:算术计数和集合计数。典型的算术计数如:总个数(count)、总和(sum)、均值(avg)、最大/最小(max/min)、吞吐(tps)和标准差(std)等,其他都比较直观,标准差稍微复杂一点,大家自己可以推演一下怎么做增量计算。那集合运算,比如95线(表示95%请求的完成时间)、999线(表示99.9%请求的完成时间),则稍微复杂一些,系统开销也更大一点。

报表建模

CAT每个报表往往有多个维度,以transaction报表为例,它有5个维度,分别是应用、机器、Type、Name和分钟级分布情况。如果全维度建模,虽然灵活,但开销将会非常之大。CAT选择固定维度建模,可以理解成将这5个维度组织成深度为5的树,访问时总是从根开始,逐层往下进行。

CAT服务端为每个报表单独分配一个线程,所以不会有锁的问题,所有报表模型都是非线程安全的,其数据是可变的。这样带来的好处是简单且低开销。

CAT报表建模是使用自研的Maven Plugin自动生成的。所有报表是可合并和裁剪的,可以轻易地将2个或多个报表合并成一个报表。在报表处理代码中,CAT大量使用访问者模式(visitor pattern)。

性能分析报表

image

故障发现报表

  • 实时业务指标监控 :核心业务都会定义自己的业务指标,这不需要太多,主要用于24小时值班监控,实时发现业务指标问题,图中一个是当前的实际值,一个是基准值,就是根据历史趋势计算的预测值。如下图就是当时的情景,能直观看到支付业务出问题的故障。

    image

  • 系统报错大盘。

  • 实时数据库大盘、服务大盘、缓存大盘等。

存储设计

CAT系统的存储主要有两块:

  • CAT的报表的存储。
  • CAT原始logview的存储。

报表是根据logview实时运算出来的给业务分析用的报表,默认报表有小时模式、天模式、周模式以及月模式。CAT实时处理报表都是产生小时级别统计,小时级报表中会带有最低分钟级别粒度的统计。天、周、月等报表都是在小时级别报表合并的结果报表。

原始logview存储一天大约100TB的数据量,因为数据量比较大所以存储必须要要压缩,本身原始logview需要根据Message-ID读取,所以存储整体要求就是批量压缩以及随机读。在当时场景下,并没有特别合适成熟的系统以支持这样的特性,所以我们开发了一种基于文件的存储以支持CAT的场景,在存储上一直是最难的问题,我们一直在这块持续的改进和优化。

消息ID的设计

CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。

CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段:

  • 第一段是应用名shop-web。
  • 第二段是当前这台机器的IP的16进制格式,01010680表示10.1.6.108。
  • 第三段的375030,是系统当前时间除以小时得到的整点数。
  • 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。

存储数据的设计

消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团点评每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。

整体存储结构如下图:

image

CAT在写数据一份是Index文件,一份是Data文件.

  • Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
  • 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
  • 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
  • CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内便宜地址读取出真正的消息内容。

服务端设计总结

CAT在分布式实时方面,主要归结于以下几点因素:

  • 去中心化,数据分区处理。
  • 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。
  • 基于内存队列,全面异步化、单线程化、无锁设计。
  • 全局消息ID,数据本地化生产,集中式存储。
  • 组件化、服务化理念。

总结感悟

最后我们再花一点点时间来讲一下我们在实践里做的一些东西。

一、MVP版本,Demo版本用了1个月,MVP版本用了3个月。

为什么强调MVP版本?因为做这个项目需要老板和业务的支持。大概在2011年左右,我们整个生产环境估计也有一千台机器(虚拟机),一旦出现问题就到运维那边看日志,看日志的痛苦大家都应该理解,这时候发现一台机器核心服务出错,可能会导致更多的问题。我们就做了MVP版本解决这个问题,当时我们大概做了两个功能:一个是实时知道所有的API接口访问量成功率等;第二是实时能在CAT平台上看到异常日志。这里我想说的是MVP版本不要做太多内容,但是在做一个产品的时候必须从MVP版本做起,要做一些最典型特别亮眼的功能让大家支持你。

二、数据质量。数据质量是整个监控体系里面非常关键,它决定你最后的监控报表质量。所以我们要和跟数据库框架、缓存框架、RPC框架、Web框架等做深入的集成,让业务方便收集以及看到这些数据。

三、单机开发环境,这也是我们认为对整个项目开发效率提升最重要的一点。单机开发环境实际上就是说你在一台机器里可以把你所有的项目都启起来。如果你在一个单机环境下把所有东西启动起来,你就会想方设法地知道我依赖的服务挂了我怎么办?比如CAT依赖了HDFS。单机开发环境除了大幅度提高你的项目开发效率之外,还能提升你整个项目的可靠性。

四、最难的事情是项目上线推动。CAT整个项目大概有两三个人,当时白天都是支持业务上线,培训,晚上才能code,但是一旦随着产品和完善以及业务使用逐渐变多,一些好的产品后面会形成良性循环,推广就会变得比较容易。

五、开放生态。公司越大监控的需求越多,报表需求也更多,比如我们美团点评,产品有很多报表,整个技术体系里面也有很多报表非常多的自定义报表,很多业务方都提各自的需求。最后我们决定把整个CAT系统里面所有的数据都作为API暴露出去,这些需求并不是不能支持,而是这事情根本是做不完的。美团点评内部下游有很多系统依赖CAT的数据,来做进一步的报表展示。

CAT项目从2011年开始做,到现在整个生产环境大概有三千应用,监控的服务端从零到几千,再到今天的两万多的规模,整个项目是从历时看起来是一个五年多的项目,但即使是做了五年多的这样一个项目,目前还有很多的需求需要开发。这边也打个广告,我们团队急缺人,欢迎对监控系统研发有兴趣的同学加入,请联系yong.you@dianping.com.

原文出自:https://tech.meituan.com/archives

Storm 的可靠性保证测试

美团点评阅读(10)

Storm 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。

在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。

在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。

Storm 的消息保证机制

Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

消息完全处理

每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
    .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
    .fieldsGrouping("split", new Fields("word"));

这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:

Tuple 树示例图

上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。

如何实现不同层次的消息保证机制

spout_bolt_acker

Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。

Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

  • 关闭 ACK 机制,即 Acker 数目设置为 0
  • Spout 不实现可靠性传输
    • Spout 发送消息是使用不带 message ID 的 API
    • 不实现 fail 函数
  • Bolt 不把处理成功或失败的消息发送给 Acker

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

  • 开启 ACK 机制,即 Acker 数目大于 0
  • Spout 实现可靠性传输保证
    • Spout 发送消息时附带 message 的 ID
    • 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
  • Bolt 在处理成功或失败后需要调用相应的方法通知 Acker

实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个 B ,下游收到两个 B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。

三种消息保证机制比较图

测试目的

Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:

  • 三种消息保证机制的表现,是否与官方的描述相符;
  • At Most Once 语义下,消息的丢失率和什么有关系、关系如何;
  • At Least Once 语义下,消息的重复率和什么有关系、关系如何。

测试环境

本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。

三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计,如下图所示:

测试流程示意图

测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。

测试场景

对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。

At Most Once

从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。

输入数据

保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。

测试结果

异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
050000050000000%0
01000000100000000%0
02000000200000000%0
03000000300000000%0
异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
1300000027749402250607.50%0
23000000230708769291323.09%0
33000000208282391717730.57%0
430000001420725157927552.64%0

结论

不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目异常次数正相关。与官方文档描述相符,符合预期。

At Least Once

为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。

测试数据

Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。

测试结果

Acker 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
010000010000002000(默认值)
020000020000002000
030000030000002000
040000040000002000
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量最大积压量
11000001000002200002000
21000001000002400102000
31000001000002600002000
41000001000002800002000

Spout 发生异常的情况

异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
01000001000000
02000002000000
03000003000000
04000004000000
异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000100000220520
2100000100000244140
4100000100000290080
6100000100000296900
316750

Bolt 发生异常的情况

调用 emit 函数之前发生异常

异常次数结果集中不重复的 Tuple 数数据重复的次数 (>1)出现重复的 Tuple 数数据丢失量
01000000
02000000
03000000
04000000
异常次数结果集中不重复的 Tuple 数数据重复的次数 (>1)出现重复的 Tuple 数数据丢失量
11000000
21000000
41000000
81000000
101000000

调用 emit 函数之后发生异常

异常次数结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
01000000
02000000
03000000
04000000
异常次数结果集中不重复的 Tuple 数数据重复的次数(>1)出现重复的 Tuple 数数据丢失数量
1100000220
2100000230
4100000250
8100000290
101000002110

结论

从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。

  • 不发生任何异常的情况下,消息不会重复不会丢失。
  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。
  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。
  • Bolt 发生异常的情况:
    • emit 之前发生异常,消息不会重复。
    • emit 之后发生异常,消息重复的次数等于异常的次数。

结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。

Exactly Once

对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。

测试数据

Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。

测试结果

Spout 发生异常情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Acker 发生异常的情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Bolt 发生异常的情况

异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

结论

在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。

总结

对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。

不同消息可靠性保证的使用场景

对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:

可靠性保证层次优点缺点使用场景
At most once处理速度快数据可能丢失都处理速度要求高,且对数据丢失容忍度高的场景
At least once数据不会丢失数据可能重复不能容忍数据丢失,可以容忍数据重复的场景
Exactly once数据不会丢失,不会重复处理速度慢对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额

如何实现不同层次的消息可靠性保证

对于 At Least Once 的保证需要做如下几步:

  1. 需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;
  2. Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);
  3. Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。

不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。

不同层次的可靠性保证如何实现

如何实现可靠的 Spout

实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:

/**
     * 想实现可靠的 Spout,需要实现如下两点
     * 1. 在 nextTuple 函数中调用 emit 函数时需要带一个     msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
     * 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
     */
public void nextTuple() {
    //设置 msgId 和 Value 一样,方便 fail 之后重发
    collector.emit(new Values(curNum + "", round +     ""), curNum + ":" + round);
}

@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
String[] args = tmp.split(":");

//消息进行重发
collector.emit(new Values(args[0], args[1]), msgId);
}

如何实现可靠的 Bolt

Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:

//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(new Values(word));  // 不建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(tuple, new Values(word));  // 建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

Trident

在 Trident 中,Spout 和 State 分别有三种状态,如下图所示:

Trident Spout 和 State 的状态图

其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义,No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例:

     OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);   //Opaque Spout
    //TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);   //Transaction Spout

    TridentTopology topology = new TridentTopology();
    String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
    Stream stream = topology.newStream(spoutTxid, spout)
            .name("new stream")
            .parallelismHint(1);

    // kafka config
    KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();      //KafkaProducerConfig 仅对 kafka 相关配置进行了封装,具体可以参考 TridentKafkaStateFactory2(Map<String, String> config)
    Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
    TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper();  //TridentToKafkaMapper 继承自 TridentTupleToKafkaMapper<String, String>,实现 getMessageFromTuple 接口,该接口中返回 tridentTuple.getString(0);

    String  dstTopic = "test__topic_for_all";

    TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);
    stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
    stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));

    stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //从spout 出来数据是一个 bytes 类型的数据,第二个是参数是自己的处理函数,第三个参数是处理函数的输出字段
            .name("write2kafka")
            .partitionPersist(stateFactory         //将数据写入到 Kafka 中,可以保证写入到 Kafka 的数据是 exactly once 的
                    , new Fields("word")
                    , new TridentKafkaUpdater())
            .parallelismHint(1);

原文出自:https://tech.meituan.com/archives

分布式会话跟踪系统架构设计与实践

美团点评阅读(29)

本文整理自美团点评技术沙龙第08期:大规模集群的服务治理设计与实践。

美团点评技术沙龙由美团点评技术团队主办,每月一期。每期沙龙邀请美团点评及其它互联网公司的技术专家分享来自一线的实践经验,覆盖各主要技术领域。

目前沙龙会分别在北京、上海和厦门等地举行,要参加下一次最新沙龙活动?赶快关注微信公众号“美团点评技术团队”。

这期沙龙主要内容有:分布式服务通信框架及服务治理系统、分布式监控系统实践、分布式会话跟踪系统架构设计与实践,特邀美恰CTO讲解时下热门话题“微服务”。其中既包括关键系统设计、在美团点评内部的实践经验,也包括一些项目在业界开源的运营实践。

前言

随着美团点评的业务发展,公司的分布式系统变得越来越复杂,我们亟需一个工具能够梳理内部服务之间的关系,感知上下游服务的形态。比如一次请求的流量从哪个服务而来、最终落到了哪个服务中去?服务之间是RPC调用,还是HTTP调用?一次分布式请求中的瓶颈节点是哪一个,等等。

简介

MTrace,美团点评内部的分布式会话跟踪系统,其核心理念就是调用链:通过一个全局的ID将分布在各个服务节点上的同一次请求串联起来,还原原有的调用关系、追踪系统问题、分析调用数据、统计系统指标。这套系统借鉴了2010年Google发表的一篇论文《dapper》,并参考了Twitter的Zipkin以及阿里的Eagle Eye的实现。
那么我们先来看一下什么是调用链,调用链其实就是将一次分布式请求还原成调用链路。显式的在后端查看一次分布式请求的调用情况,比如各个节点上的耗时、请求具体打到了哪台机器上、每个服务节点的请求状态,等等。它能反映出一次请求中经历了多少个服务以及服务层级等信息(比如你的系统A调用B,B调用C,那么这次请求的层级就是3),如果你发现有些请求层级大于10,那这个服务很有可能需要优化了。

网络优化

如上图所示,红框内显示了一次分布式请求经过各个服务节点的具体IP,通过该IP就可以查询一次分布式请求是否有跨机房调用等信息,优化调用链路的网络结构。

瓶颈查询

再比如上图,红框部分显示的是系统调用的瓶颈节点,由于该节点的耗时,导致了整个系统调用的耗时延长,因此该节点需要进行优化,进而优化整个系统的效率。这种问题通过调用链路能很快发现下游服务的瓶颈节点;但是假如没有这样的系统,我们会怎样做呢?首先我会发现下游服务超时造成了我的服务超时,这时我会去找这个下游服务的负责人,然后该负责人发现也不是他自己服务的问题,而是他们调用了其他人的接口造成的问题,紧接着他又去找下游的服务负责人。我们都知道跨部门之间的沟通成本很高的,这么找下去会花费大量的不必要时间,而有了MTrace之后,你只需要点开链路就能发现超时问题的瓶颈所在。

优化链路

我们再来看下上面这张图,红框部分都是同一个接口的调用,一次请求调用相同的接口10几次甚至是几十次,这是我们不想看到的事情,那么整个系统能不能对这样的请求进行优化,比如改成批量接口或者提高整个系统调用的并行度?在美团点评内部我们会针对这样的链路进行筛选分析,然后提供给业务方进行优化。

异常log绑定

通过MTrace不仅能做上述这些事情,通过它的特性,还能携带很多业务感兴趣的数据。因为MTrace可以做到数据和一次请求的绑定以及数据在一次请求的网络中传递。比如一些关键的异常log,一般服务的异常log很有可能是因为上游或者下游的异常造成的,那就需要我们手动地对各个不同服务的异常log做mapping。看这次的异常log对应到上游服务的哪个log上,是不是因为上游传递的一些参数造成了该次异常?而通过MTrace就可以将请求的参数、异常log等信息通过traceId进行绑定,很容易地就把这些信息聚合到了一起,方便业务端查询问题。

透明传输数据

业务端往往有这样的需求,它希望一些参数能在一次分布式请求一直传递下去,并且可以在不同的RPC中间件间传递。MTrace对该类需求提供了两个接口:

put(map<String, String> data)
putOnce(map<String, String> data)
  • put 接口:参数可以在一次分布式请求中一直传递。
  • putOnce 接口:参数在一次分布式请求中只传递一级。

如下图所示

  • 左侧绿色部分是put接口,service中调用了put接口传递了uid=123456这个参数,它会在网络中一直传递,可以在服务A中通过get(“uid”)的方式获取参数值,也可以在服务C中通过get(“uid”)的方式获取参数值。
  • 右侧蓝色部分是putOnce接口,service中调用了putOnce接口传递pid=11111,它只会传递一级,可以在服务B中通过get(“pid”)的方式获取参数值,但是在服务D中就获取不到pid的值了。

以上的两种接口可以用于业务自定义传递数据,比如通过传递一个服务标识,用于AB test,下游的所有服务获取到test的标识就会走test的策略,即上游服务可以传递一些参数,控制所有下游服务的逻辑。当然业务也可以通过该接口传递一些临时性的数据。

系统架构

主要分为三层:数据埋点上报、数据收集计算、数据前端展示。

基本概念

traceId

全局唯一,64位整数,用于标识一次分布式请求,会在RPC调用的网络中传递。

spanId

签名方式生成:0, 0.1, 0.1.1, 0.2。用于标识一次RPC在分布式请求中的位置,比如0.2就是0节点服务调用的第二个服务。

annotation

业务端自定义埋点,业务感兴趣的想上传到后端的数据,比如该次请求的用户ID等。

数据埋点

埋点SDK

提供统一的SDK,在各个中间件中埋点,生成traceID等核心数据,上报服务的调用数据信息。

  • 生成调用上下文;
  • 同步调用上下文存放在ThreadLocal, 异步调用通过显式调用API的方式支持;
  • 网络中传输关键埋点数据,用于中间件间的数据传递,支持Thrift, HTTP协议。

业内有些系统是使用注解的方式实现的埋点,这种方式看似很优雅,但是需要业务方显式依赖一些AOP库,这部分很容易出现问题,因为AOP方式太过透明,导致查问题很麻烦,而且业务方配置的东西越多越容易引起一些意想不到的问题,所以我们的经验是尽量在各个统一的中间件中进行显式埋点,虽然会导致代码间耦合度增加,但是方便后续定位问题。其次,为了整个框架的统一,MTrace并非仅支持Java一种语言,而AOP的特性很多语言是不支持的。

Agent

  • 透传数据,用作数据转发;
  • 做流量控制;
  • 控制反转,很多策略可以通过agent实现,而不需要每次都升级业务代码中的SDK。

Agent仅仅会转发数据,由Agent判断将数据转发到哪里,这样就可以通过Agent做数据路由、流量控制等操作。也正是由于Agent的存在,使得我们可以在Agent层实现一些功能,而不需要业务端做SDK的升级,要知道业务端SDK升级的过程是很缓慢的,这对于整个调用链的系统来说是不可接受的,因为MTrace整个系统是针对庞大的分布式系统而言的,有一环的服务缺失也会造成一定的问题。

目前MTrace支持的中间件有:

  • 公司内部RPC中间件
  • http中间件
  • mysql中间件
  • tair中间件
  • mq中间件

数据埋点的四个阶段:

  • Client Send : 客户端发起请求时埋点,需要传递一些参数,比如服务的方法名等

      Span span = Tracer.clientSend(param);
    
  • Server Recieve : 服务端接收请求时埋点,需要回填一些参数,比如traceId,spanId

      Tracer.serverRecv(param);
    
  • ServerSend : 服务端返回请求时埋点,这时会将上下文数据传递到异步上传队列中

      Tracer.serverSend();
    
  • Client Recieve : 客户端接收返回结果时埋点,这时会将上下文数据传递到异步上传队列中

      Tracer.clientRecv();
    

埋点上下文

上图CS、SR为创建上下文的位置,CR、SS为归档上下文的位置。

上下文归档

上下文归档,会把上下文数据异步上传到后端,为了减轻对业务端的影响,上下文上报采用的是异步队列的方式,数据不会落地,直接通过网络形式传递到后端服务,在传递之前会对数据做一层压缩,主要是压缩比很可观,可以达到10倍以上,所以就算牺牲一点CPU资源也是值得的。具体上报的数据如图所示:

我们之前在数据埋点时遇到了一些问题:

  • 异步调用
    • 异步IO造成的线程切换,不能通过ThreadLocal传递上下文。
    • 显式的通过API进行埋点传递,切换前保存,切换后还原。
    • 提供封装好的ThreadPool库。
  • 数据量大,每天千亿级别的数据
    • 批量上报
    • 数据压缩
    • 极端情况下采样

数据存储

Kafka使用

我们在SDK与后端服务之间加了一层Kafka,这样做既可以实现两边工程的解耦,又可以实现数据的延迟消费。我们不希望因为瞬时QPS过高而引起的数据丢失,当然为此也付出了一些实效性上的代价。

实时数据Hbase

调用链路数据的实时查询主要是通过Hbase,使用traceID作为RowKey,能天然的把一整条调用链聚合在一起,提高查询效率。

离线数据Hive

离线数据主要是使用Hive,可以通过SQL进行一些结构化数据的定制分析。比如链路的离线形态,服务的出度入度(有多少服务调用了该服务,该服务又调用了多少下游服务)

前端展示

前端展示,主要遇到的问题是NTP同步的问题,因为调用链的数据是从不同机器上收集上来的,那么聚合展示的时候就会有NTP时间戳不同步的问题,这个问题很难解决,于是我们采取的方式是前端做一层适配,通过SpanId定位调用的位置而不是时间,比如0.2一定是发生在0.1这个Span之后的调用,所以如果时间出现漂移,就会根据SpanId做一次校正。即判断时间顺序的优先级为最高是spanid,然后是时间戳。

总结

核心概念:调用链;
用途:定位系统瓶颈,优化系统结构、统计系统指标、分析系统数据;
架构:埋点上报、收集计算、展示分析。

分布式会话跟踪系统主要的特点就是能关联服务之间的联动关系,通过这层关系可以延伸出来很多有意义的分析数据,统计数据。为优化系统结构,查询系统瓶颈问题带来了极大的便利。

原文出自:https://tech.meituan.com/archives

美团数据仓库-数据脱敏

美团点评阅读(29)

背景与目标

在数据仓库建设过程中,数据安全扮演着重要角色,因为隐私或敏感数据的泄露,会对数据主体(客户,员工和公司)的财产、名誉、人身安全、以及合法利益造成严重损害。因此我们需要严格控制对仓库中的数据访问,即什么样的人员或者需求才可以访问到相关的数据。这就要求对数据本身的敏感程度进行安全级别划分。数据有了安全等级的划分,才能更好管理对数据访问控制,以此来保护好数据安全。
举个例子简单的说明下,例如我们仓库中有一张关于注册用户的基本信息表User,其中有手机号mobile,昵称username两个字段。我们在划分数据安全层级的时,将用户mobile的安全等级划分为L2要高于username的等级L1,并规定只有访问权限达到L2的运营部门才能访问mobile字段。这样在公司各个部门需要访问注册用户基本信息表User时,我们只需检查访问者是否来自运营部门,如果是运营部可以访问mobile,如果不是只能访问username信息了。这样就有效的防止用户手机号被不相关工作人员泄露出去,同时也不影响查询用户username的需求。
但是往往在实际生产过程中,应用场景会更加复杂,仅靠类似这样的访问控制,满足不了生产的需要,还需要结合其它的途径,而数据脱敏就是一种有效的方式,既能满足日常生产的需要,又能保护数据安全。
数据脱敏,具体指对某些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数据的可靠保护。这样可以使数据本身的安全等级降级,就可以在开发、测试和其它非生产环境以及外包或云计算环境中安全地使用脱敏后的真实数据集。借助数据脱敏技术,屏蔽敏感信息,并使屏蔽的信息保留其原始数据格式和属性,以确保应用程序可在使用脱敏数据的开发与测试过程中正常运行。

敏感数据梳理

在数据脱敏进行之前,我们首先要确定哪些数据要作为脱敏的目标。我们根据美团特有的业务场景和数据安全级别划分(绝密、高保密、保密、可公开,四个级别), 主要从“高保密”等级的敏感数据,开始进行梳理。
这里我们把敏感数据分成四个维度进行梳理,用户、商家、终端、公司。

  1. 从用户维度进行梳理可能有这些敏感字段如下:手机号码、邮件地址、账号、地址、固定电话号码等信息(此外个人隐私数据相关还有如:种族、政治观点、宗教信仰、基因等)
  2. 从商家维度进行梳理:合同签订人,合同签订人电话等(不排除全局敏感数据:如商家团购品类等)
  3. 从用户终端维度进行梳理:能够可能标识终端的唯一性字段,如设备id。
  4. 从公司角度进行梳理:交易金额、代金卷密码、充值码等

确定脱敏处理方法

梳理出了敏感数据字段,我们接下来的工作就是如何根据特定的应用场景对敏感字段实施具体的脱敏处理方法。
常见的处理方法如下几种有:

  1. 替换:如统一将女性用户名替换为F,这种方法更像“障眼法”,对内部人员可以完全保持信息完整性,但易破解。
  2. 重排:序号12345重排为54321,按照一定的顺序进行打乱,很像“替换”, 可以在需要时方便还原信息,但同样易破解。
  3. 加密:编号12345加密为23456,安全程度取决于采用哪种加密算法,一般根据实际情况而定。
  4. 截断:13811001111截断为138,舍弃必要信息来保证数据的模糊性,是比较常用的脱敏方法,但往往对生产不够友好。
  5. 掩码: 123456 -> 1xxxx6,保留了部分信息,并且保证了信息的长度不变性,对信息持有者更易辨别, 如火车票上得身份信息。
  6. 日期偏移取整:20130520 12:30:45 -> 20130520 12:00:00,舍弃精度来保证原始数据的安全性,一般此种方法可以保护数据的时间分布密度。

但不管哪种手段都要基于不同的应用场景,遵循下面两个原则:
1.remain meaningful for application logic(尽可能的为脱敏后的应用,保留脱敏前的有意义信息)
2.sufficiently treated to avoid reverse engineer(最大程度上防止黑客进行破解)
以这次脱敏一个需求为例:
美团一般的业务场景是这样的,用户在网站上付款一笔团购单之后,我们会将团购密码,发到用户对应的手机号上。这个过程中,从用户的角度来看团购密码在未被用户消费之前,对用户来说是要保密的,不能被公开的,其次美团用户的手机号也是要保密的,因为公开之后可能被推送一些垃圾信息,或者更严重的危害。从公司内部数据分析人员来看,他们有时虽然没有权限知道用户团购密码,但是他们想分析公司发送的团购密码数量情况,这是安全允许;再有数据分析人员虽然没有权限知道用户具体的手机号码,但是他们需要统计美团用户手机的地区分布情况,或者运营商分布差异,进而为更上层的决策提供支持。
根据这样的需求,我们可以对团购密码做加密处理保证其唯一性,也保留其原有的数据格式,在保密的同时不影响数据分析的需求。同样,我们将用户的手机号码的前7位,关于运营商和地区位置信息保留,后四位进行模糊化处理。这样同样也达到了保护和不影响统计的需求。

因此从实际出发遵循上面的两个处理原则,第一阶段我们在脱敏工具集中,确定了如下4种基本类型的脱敏方案(对应4个udf):

字段名称 脱敏方法 举例 脱敏原则
电话号码(moblie)掩码13812345678-> 13812340000防止号码泄露,但保留运营商和地区信息 (唯一性,由前端绑定或者注册时约束)
邮件(email)截断+ 加密hxs@163.com -> 6225888e3a1d4a139f5f5db98d846102b2cd0d@163.com保留邮件域信息
团购密码(code)加密4023926843399219 -> 1298078978加密后在一定精度上保持唯一性,并与数据类型一致
设备号(deviceid)加密ffbacff42826302d9e832b7e907a212a -> b9c2a61972a19bf21b06b0ddb8ba642d加密后保持唯一性

确定实施范围与步骤

通过上面字段的梳理和脱敏方案的制定,我们对美团数据仓库中涉及到得敏感字段的表进行脱敏处理。在数据仓库分层理论中,数据脱敏往往发生在上层,最直接的是在对外开放这一层面上。在实际应用中,我们既要参考分层理论,又要从美团现有数据仓库生产环境的体系出发,主要在数据维度层(dim),以及基础服务数据层(fact)上实施脱敏。这样,我们可以在下游相关数据报表以及衍生数据层的开发过程中使用脱敏后的数据,从而避免出现数据安全问题。
确认处理的表和字段后,我们还要确保相关上下游流程的正常运行, 以及未脱敏的敏感信息的正常产出与存储(通过更严格的安全审核来进行访问)。
以用户信息表user为例,脱敏步骤如下:
1.首先生产一份ndm_user未脱敏数据,用于未脱敏数据的正常产出。
2.对下游涉及的所有依赖user生产流程进行修改,来确保脱敏后的正常运行,这里主要是确认数据格式,以及数据源的工作。
3.根据对应的脱敏方法对user表中对应的字段进行脱敏处理。

总结

通过上面的几个步骤的实施,我们完成了第一阶段的数据脱敏工作。在数据脱敏方案设计与实施过程中, 我们觉得更重要的还是从特定的应用场景出发进行整体设计,兼顾了数据仓库建设这一重要考量维度。数据脱敏实施为公司数据安全的推进,提供了有力支持。当然,我们第一阶段脱敏的工具集还相对较少,需要补充。 脱敏的技术架构还有待完善和更加自动化。
本文关于数据安全和数据访问隔离的控制阐述较少,希望通过以后的生产实践,继续为大家介绍。

参考

参考文献如下:

  1. http://en.wikipedia.org/wiki/Data_masking
  2. http://www.prnews.cn/press_release/51034.htm

原文出自:https://tech.meituan.com/archives

CentOS6上Hadoop集群中服务器cpu sys态异常的定位与解决

美团点评阅读(22)

问题现象

  1. 在zabbix系统中,对Hadoop集群的历史监控数据分析时,发现在执行大Job任务时,某些服务节点的cpu sys态很高;
  2. 具体以hadoop_A服务节点为例,在10:15-10:40这个时间段,cpu user态为60%,而sys态则高达35%;
  3. 对于整个Hadoop集群,并不是所有的节点都会出现sys过高的问题,产生此类问题的都是部署CentOS6系统的节点。

定位分析

  1. 根据zabbix系统中cpu sys很高的问题发生时间,找到触发问题的大Job,以便于后面的问题重现和问题验证;
  2. 对问题节点hadoop_A的硬件信息和OS系统日志/var/log/messages进行初步检查,并未发现异常;
  3. 重启Job,重现问题。并使用nmon工具对问题节点hadoop_A的资源负载进行粗粒度的实时监测;
    hadoop_A节点上某一时刻瞬时的负载状态如下图:
    Alt text
  4. 通过上图,注意到网络流量达到了119.7MB/s,接收和发送的峰值都超过了120MB/s,初步怀疑网口在某一时间成为瓶颈,导致内核的sys过高。对hadoop_A的网口计数器细化分析,系统在uptime了83天的状态下,网口计数器中除overruns指标达22万之外,其他的网络指标正常。 这说明网络确实曾达到过峰值,也丢过包,但频率非常低,sys过高的问题应该不是网络负载过高触发。
    ifconfig查询网口的计数器状态如下图:
    Alt text
  5. 需要对系统进行更细粒度的分析,找出系统sys态消耗在什么地方。在hadoop_A节点上部署perf工具,通过perf top对kernel事件采样,实时分析内核事件。
    perf top在某一时刻的状态图如下:
    Alt text
    通过perf top监控可以断定:kernel中存在频繁的spin_lock_irqsave内核系统调用, sys态消耗过高应该与此有关。
  6. 重启Job,再次重现问题,并利用perf工具对内核函数的调用关系采样:
    perf record -a -g -F 1000 sleep 30
    采样结束后,在当前目录上会生成一个perf.data文件,使用perf工具查看函数调用关系:
    perf report -g
    perf report查看到的调用关系如下图:
    Alt text
  7. 通过调用依赖关系分析,spin_lock_irqsave主要called by compaction_alloc,初步推测问题由kernel的内存管理部分触发。联想到centos 6相对于centos 5在kernel内存管理模块的一些改进点(如transparent huge page, 基于numa的内存分配等),有没有可能是CentOS6新增的THP特性导致cpu sys过高?再在google上搜一把相关函数名的关键字,印证这个猜测。

问题验证

  1. 选择在节点hadoop_A上面做验证测试,通过以下内核参数优化关闭系统THP特性:
     echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
     echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag
    
  2. 重启触发问题的大Job,在hadoop_A节点未出现cpu sys 状态过高的现象。
  3. 在生产系统上运行24小时后,通过zabbix系统观察,其他内核未优化节点如hadoop_B,hadoop_C等节点依然存在cpu sys态过高的问题,而关闭了THP特性的hadoop_A节点没有出现cpu sys态过高的问题,验证了之前的分析。
    hadoop_B和hadoop_C依然存在cpu sys态过高的问题:
    Alt text
    hadoop_A cpu sys态正常:
    Alt text

结论

将Hadoop集群中所有CentOS6类型节点的THP特性关闭掉(在CentOS6中,THP特性默认都是打开的),关闭方法如下:

    echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
    echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag

值得注意的是,需要在puppet系统中部署该项优化,以免节点重启导致修改丢失。

参考

事后,在redhat官网和cloudera官网也搜到了相关的内容,附录下来,供参考。

  1. 在redhat的官网上,有对THP特性的细化说明:
    https://access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-transhuge.html
  2. 在cloudera的CDH4部署说明中,也建议将系统的THP的compaction特性关闭:
    http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.2/CDH4-Installation-Guide/cdh4ig_topic_11_6.html

原文出自:https://tech.meituan.com/archives

关于Relay Log无法自动删除的问题

青禾阅读(15)

本文介绍了一次运维实践中relay-log长期无法自动删除的原因和解决过程

背景: 今天在运维一个mysql实例时,发现其数据目录下的relay-log 长期没有删除,已经堆积了几十个relay-log。 然而其他作为Slave服务器实例却没有这种情况。

现象分析

通过收集到的信息,综合分析后发现relay-log无法自动删除和以下原因有关。

  • 该实例原先是一个Slave:导致relay-log 和 relay-log.index的存在
  • 该实例目前已经不是Slave:由于没有了IO-Thread,导致relay-log-purge 没有起作用( 这也是其他Slave实例没有这种情况的原因,因为IO-thread会做自动rotate操作)。
  • 该实例每天会进行日常备份:Flush logs的存在,导致每天会生成一个relay-log
  • 该实例没有配置expire-logs-days:导致flush logs时,也不会做relay-log清除

简而言之就是: 一个实例如果之前是Slave,而之后停用了(stop slave),且没有配置expire-logs-days的情况下,会出现relay-log堆积的情况。

深入分析

顺带也和大家分享下MySQL 内部Logrotate的机制

Binary Log rotate机制:

  • Rotate:每一条binary log写入完成后,都会判断当前文件是否超过 max_binlog_size,如果超过则自动生成一个binlog file
  • Delete:expire-logs-days 只在 实例启动时 和 flush logs 时判断,如果文件访问时间早于设定值,则purge file

Relay Log rotate 机制:

  • Rotate:每从Master fetch一个events后,判断当前文件是否超过 max_relay_log_size 如果超过则自动生成一个新的relay-log-file
  • Delete:purge-relay-log 在SQL Thread每执行完一个events时判断,如果该relay-log 已经不再需要则自动删除
  • Delete:expire-logs-days 只在 实例启动时 和 flush logs 时判断,如果文件访问时间早于设定值,则purge file (同Binlog file) (updated: expire-logs-days和relaylog的purge没有关系)
    PS: 因此还是建议配置 expire-logs-days , 否则当我们的外部脚本因意外而停止时,还能有一层保障。

因此建议当slave不再使用时,通过reset slave来取消relaylog,以免出现relay-log堆积的情况。








原文出自:http://cenalulu.github.io/

Hadoop安全实践

美团点评阅读(15)

前言

在2014年初,我们将线上使用的 Hadoop 1.0 集群切换到 Hadoop 2.2.0 稳定版, 与此同时部署了 Hadoop 的安全认证。本文主要介绍在 Hadoop 2.2.0 上部署安全认证的方案调研实施以及相应的解决方法。

背景

集群安全措施相对薄弱

最早部署Hadoop集群时并没有考虑安全问题,随着集群的不断扩大, 各部门对集群的使用需求增加,集群安全问题就显得颇为重要。说到安全问题,一般包括如下方面:

  • 用户认证(Authentication)
    即是对用户身份进行核对, 确认用户即是其声明的身份, 这里包括用户和服务的认证。

  • 用户授权(Authorization)
    即是权限控制,对特定资源, 特定访问用户进行授权或拒绝访问。用户授权是建立再用户认证的基础上, 没有可靠的用户认证谈不上用户授权。

未开启安全认证时,Hadoop 是以客户端提供的用户名作为用户凭证, 一般即是发起任务的Unix 用户。一般线上机器部署服务会采用统一账号,当以统一账号部署集群时,所有执行 Hadoop 任务的用户都是集群的超级管理员,容易发生误操作。即便是以管理员账号部署集群,恶意用户在客户端仍然可以冒充管理员账号执行。

集群整体升级到 hadoop 2.0

2013年10月份 Hadoop 2.2.0 发布,作为 Apache Hadoop 2.X 的 GA 版本。我们考虑将集群整体升级 Hadoop 2.2.0,进入 yarn 时代。与此同时,我们计划在升级过程中一并把集群安全工作做到位,主要基于以下考虑:

  • 与升级工作一样,安全同样是基础工作,把安全搞好会方便我们后续的工作,否则会成为下一个阻碍。
  • 所谓基础工作,就是越往后改动越难的工作,目前不做,将来依赖更多,开展代价更大。

综上,我们的需求是在低版本hadoop升级到Yarn的过程中部署Hadoop安全认证,做好认证之后我们可以在此之上开启适当的权限控制(hdfs, 队列)。

方案调研

在方案调研之前先明确以下安全实践的原则,如下:

  • 做为一个后端服务平台,部署安全的主要目的是防止用户误操作导致的事故(比如误删数据,误操作等)
  • 做安全是为了开放,开放的前提是保证基本的安全,数据安全与平台安全
  • 在保证安全的前提下,尽量简化运维

分析我们遇到的问题,这里我们需要调研:

  • 账号拆分与相应管理方案
  • 开启 Hadoop 安全认证
  • 客户端针对安全认证的相应调整

账号拆分与相应管理方案

集群账号管理

原先我们使用单一账号作为集群管理员,且这一账号为线上统一登录账号, 这存在极大的安全隐患。我们需要使用特殊账号来管理集群。这里涉及的问题是,我们需要几个运维账号呢?
一种简单的做法是使用一个特殊运维账号(比如 hadoop), CDHApache官方 都推荐按服务划分分账号来启动集群:

User:GroupDaemons
hdfs:hadoopNameNode, Secondary NameNode, Checkpoint Node, Backup Node, DataNode
yarn:hadoopResourceManager, NodeManager
mapred:hadoopMapReduce JobHistory Server

考虑到精细化控制可以有效避免误操作,这里我们遵循官方的建议使用多账号。
在从单一运维账号迁移到多个账号部署时,需要考虑相关文件权限问题,包括本地以及hdfs两部分,这可以在安全部署上线时完成相应改动。

用户账号管理

美团很多小组都有使用 Hadoop 来进行大数据处理需求, 故需要一定程度的多租户环境, 这里主要考虑其中的数据和操作的权限问题。hdfs 本身仅提供类 Unix 的权限系统, 默认的组概念也相对鸡肋。鉴于此,在多用户的管理上可以有简单粗暴的方案:

不同组有各自的根目录,使用不同的账号,对组内文件有全部权限。不同组之间相互不能访问数据(除非手动修改)。

在一个集中的数据仓库环境下,又要生产各个部门的统计数据的话,上述策略不够灵活。目前Cloudera 有一个精细化权限控制的解决方案 sentry, 支持 Role based 的权限管理。由于其定制化较高,不方便使用, 故暂未考虑。

开启 Hadoop 安全认证

Hadoop 的安全认证是基于 Kerberos 实现的。 Kerberos 是一个网络身份验证协议,用户只需输入身份验证信息,验证通过获取票据即可访问多个接入 Kerberos 的服务, 机器的单点登录也可以基于此协议完成的。 Hadoop 本身并不创建用户账号,而是使用 Kerberos 协议来进行用户身份验证,从Kerberos凭证中的用户信息获取用户账号, 这样一来跟实际用户运行的账号也无关。

这里我们从 YARN 上的 MR 任务提交过程简单说明一下:
 Yarn 任务提交步骤

  1. 用户执行任务前,先通过KDC认证自己,获取TGT(Ticket Granting Ticket)。KDC是 Kerberos 认证的中心服务,存储用户和服务的认证信息。
  2. 用户通过 TGT 向 KDC 请求访问服务的Ticket, KDC 生成 session key 后一并发给客户端。
  3. 客户端通过 service ticket 向服务认证自己,完成身份认证。
  4. 完成身份认证后客户端向服务请求若干token供后续任务执行认证使用(比如 HDFS NameNode Delegation Token, YARN ResourceManager Delegation Token)
  5. 客户端连同获取到的 token 一并提交任务,后续任务执行使用 token 进行来自服务的认证

从上可以看出,出于性能的考虑,Hadoop 安全认证体系中仅在用户跟服务通信以及各个服务之间通信适用 Kerberos 认证,在用户认证后任务执行,访问服务,读取/写入数据等均采用特定服务(NameNode, Resource Manager)发起访问token,让需求方凭借 token 访问相应服务和数据。这里 token 的传递,认证以及更新不做深入讨论。

关于开启 Hadoop 安全认证, Cloudera 有详细的文档介绍。由于自身环境以及部署运维的考虑,最终的部署方案有些许出入, 一一说明。

Kerberos 部署

Hadoop 安全认证需要一个 Kerberos 集群, 部署 Kerberos 需要部署KDC。 由于我们的环境中使用 freeIPA 进行主机认证相关的权限控制,已经集成 Kerberos 服务, 故不需要另外部署。
Kerberos 相关的运维操作, 比如添加用户,服务,导出keytab,均可以通过 ipa 相关接口来进行。

Container 的选择

从上图可以看出用户发起的任务是在特定的容器(Container)内执行的, 一开始我们考虑使用DefaultContainer 而不是官方推荐的 LinuxContainer, 缺点是对任务之间的物理隔离以及防范恶意任务方面会有缺陷, 不过方便部署,使用LinuxContainer需要在集群各台机器上部署用户账号。
实际测试发现由于MAPREDUCE-5208的引入,在 hadoop 2.2.0 上开启安全认证后无法使用 DefaultContainer
这里不希望对代码有过多定制化的修改,我们考虑还是使用 LinuxContainer, 需要解决一下问题:

  • 用户账号创建
    我们需要在集群内添加所有可能的任务发起用户账号。借助 freeipa 的统一的用户管理 , 我们只需要在 freeipa 上添加相应用户即可。
  • container-executor 和 container-executor.cfg 的部署
    container-executor 作为Yarn 的 container 执行程序,有一系列的权限要求:

    Be owned by root
    Be owned by a group that contains only the user running the YARN daemons
    Be setuid
    Be group readable and executable

    配置 container-executor.cfg 不仅需要是owned by root,且其所在目录同样需要 owned by root。这两者都给自动化部署带来不便,鉴于此部分比较独立且基本不会改变,我们可以将其加入集群机器的 puppet 管理当中。

DataNode 启动方式

CDH 推荐的datanode 的启动方式需要使用低端口并且使用jsvc发布, 在运维方面也不太方便。这里我们通过配置ignore.secure.ports.for.testing=true来启动datanode, 规避这些约束。

客户端针对安全认证的相应调整

集群开启安全认证之后, 依赖集群的客户端(脚本, 服务)都需要做相应修改,不过改动基本无异。大部分服务都已包括对 Kerberos 认证的相应处理, 基本不需要修改。

这里首先得说明一下开启安全认证后的认证方式:

  • 使用密码认证
    使用用户密码通过kinit认证, 获取到的TGT存在本地凭证缓存当中, 供后续访问服务认证使用。一般在交互式访问中使用。
  • 使用 keytab 认证
    用户通过导出的keytab 可以免密码进行用户认证, 后续步骤一致。一般在应用程序中配置使用。

Kerberos 凭证(ticket) 有两个属性, ticket_lifetimerenew_lifetime。其中 ticket_lifetime 表明凭证生效的时限,一般为24小时。在凭证失效前部分凭证可以延期失效时间(即Renewable), renew_lifetime 表明凭证最长可以被延期的时限,一般为一个礼拜。当凭证过期之后,对安全认证的服务的后续访问则会失败。这里第一个问题就是如何处理凭证过期。

凭证过期处理策略

在最早的 Security features for Hadoop 设计中提出这样的假设:

A Hadoop job will run no longer than 7 days (configurable) on a MapReduce cluster or accessing HDFS from the job will fail.

对于一般的任务, 24小时甚至延迟到一周的凭证时限是足够充分的。所以大部分时间我们只需要在执行操作之前使用 kinit 认证一遍,再起后台任务进行周期性凭证更新即可。

while true ; do kinit -R; sleep $((3600 * 6)) ; done &

不过对于需要常驻的访问Hadoop集群的服务来说,上述假设就不成立了。这时候我们可以

  1. 扩大 ticket_lifetimerenew_lifetime 时限
    扩大凭证存活时限可以解决此问题,但由于Kerberos跟我们线上用户登陆认证绑定,会带来安全隐患,故不方便修改。

  2. 定期重新进行kinit 认证更新凭证
    不仅仅是定期延长认证时间,可以直接定期重新认证以延长凭证有限期限。一般我们需要导出 keytab 来进行定期认证的操作。

Hadoop 将 Kerberos 认证部分进行了一定的封装,实际上并不需要那么复杂, 这里重点可以看看 UserGroupInformation 这个类。

UserGroupInformation

UserGroupInformation 这个类在 JAAS 框架上封装了 Hadoop 的用户信息, 更确切地说是对 Subject 做了一层封装。

  UserGroupInformation(Subject subject) {
    this.subject = subject;
    this.user = subject.getPrincipals(User.class).iterator().next();
    this.isKeytab = !subject.getPrivateCredentials(KerberosKey.class).isEmpty();
    this.isKrbTkt = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
  }

JAAS 是 Java 认证和授权服务(Java Authentication and Authorization Service)的缩写, 主要包含以下几个实体:

  • Subject
    Subject 是一个不可继承的实体类,它标志一个请求的来源, 包含相关的凭证标识(Principal) 和 公开和私有的凭据(Credential)。
  • Principal
    凭证标识,认证成功后,一个 Subject 可以被关联多个Principal。
  • Credential
    凭据,有公有凭据以及私有凭据。

JAAS的认证过程如下:

  1. An application instantiates a LoginContext.
  2. The LoginContext consults a Configuration to load all of the LoginModules configured for that application.
  3. The application invokes the LoginContext’s login method.
  4. The login method invokes all of the loaded LoginModules. Each LoginModule attempts to authenticate the subject. Upon success, LoginModules associate relevant Principals and credentials with a Subject object that represents the subject being authenticated.
  5. The LoginContext returns the authentication status to the application.
  6. If authentication succeeded, the application retrieves the Subject from the LoginContext.

需要认证的代码片段可以包装在 doPrivileged 当中, 可以直接使用 Subject.doAs 方法,支持嵌套。

在安全模式下,UGI 支持不同LoginContext 配置, 均是通过 HadoopConfiguration 类动态产生:

  • hadoop-user-kerberos
    使用kerberos缓存凭证登陆的配置, useTicketCache 置为 true.
  • hadoop-keytab-kerberos
    使用keytab登陆的配置, useKeyTab 置为 true.

UGI 当中有多处认证, getLoginUser 方法使用 hadoop-user-kerberos 配置认证:

  1. 通过配置生成 LoginContext
  2. 调用 LoginContext.login 方法完成登陆, 通过 ticket cache 中凭证完成登陆
  3. 判断是否需要其他用户身份(proxy user)执行
  4. HADOOP_TOKEN_FILE_LOCATION 中的 token 加入 Credentials 集合当中
  5. 另起一个线程做周期性的凭证更新 spawnAutoRenewalThreadForUserCreds

步骤5可以看出当我们存在凭证后并不需要主动做周期性地凭证更新。

而 loginUserFromKeytab 方法使用 hadoop-kerberos 配置认证:

  1. 通过配置生成 LoginContext
  2. 调用 LoginContext.login 方法完成登陆, 使用keytab完成登陆

loginUserFromKeytab 没有对凭证做周期的更新, 那怎么保证凭证不会过期呢?

  1. 在访问集群执行相关操作前, 可以调用 checkTGTAndReloginFromKeytab 来尝试更新凭证(实际上是重新登陆了)
  2. 在凭证过期时,创建 IPC 失败会触发调用 reloginFromKeytab 来重新登陆

Client.java

    private synchronized void handleSaslConnectionFailure(
        final int currRetries, final int maxRetries, final Exception ex,
        final Random rand, final UserGroupInformation ugi) throws IOException,
        InterruptedException {
      ugi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws IOException, InterruptedException {
          final short MAX_BACKOFF = 5000;
          closeConnection();
          disposeSasl();
          if (shouldAuthenticateOverKrb()) {
            if (currRetries < maxRetries) {
              if(LOG.isDebugEnabled()) {
                LOG.debug("Exception encountered while connecting to "
                    + "the server : " + ex);
              }
              // try re-login
              if (UserGroupInformation.isLoginKeytabBased()) {
                UserGroupInformation.getLoginUser().reloginFromKeytab();
              } else {
                UserGroupInformation.getLoginUser().reloginFromTicketCache();
              }

可见如果是使用 keytab 认证的话,认证是长期有效的。

从上述代码中可以看到,不论是否是keytab认证,创建IPC失败均会尝试重新登陆。

基于keytab 的Kerberos认证方式

为了让用户免于记忆密码,我们可以考虑导出并交付keytab给相关用户(前提是用户数量可控, 比如是以虚拟用户为单位)。
这样,用户的Hadoop任务认证方式可以有:

  • 直接使用 keytab kinit 之后访问
  • 或者调用 loginUserFromKeytab 完成登录,然后将代码片段包裹在 UGI 的 doAs 方法当中执行

上线部署

确定了部署方案之后, 我们在升级 hadoop 版本的同时完成了安全认证的部署。在部署和使用中我们遇到若干问题,这里一一说明。

JCE 部署

开启安全认证时发现 Kerberos 认证不通过:

Client failed to SASL authenticate: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Checksum failed)]

由于我们部署的Kerberos默认使用 AES-256 加密, 需要在Hadoop环境(集群以及客户端)上安装 Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy File, 否则Kerberos认证不通过。可以通过此 gist 验证改动是否生效。此步骤可以添加到puppet当中。

SNN getimage 返回 NPE

开启安全认证发现 SNN 持续由于 getimage 报错NPE 退出, 相关错误如下。

2013-12-29 23:56:19572 DEBUG org.apache.hadoop.security.authentication.server.AuthenticationFilter: Request [http://XXX.com:50070/getimage?getimage=1&txid=8627&storageInfo=-47:200271
8265:0:CID-3dce02cb-a1c2-4ab8-8b12-f23bbefd7bcc] triggering authentication
2013-12-29 23:56:19580 WARN org.apache.hadoop.security.authentication.server.AuthenticationFilter: Authentication exception: GSSException: Failure unspecified at GSS-API level (Mechanism level: Specified
 version of key is not available (44))
org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: Failure unspecified at GSS-API level (Mechanism level: Specified version of key is not available (44))
        at org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler.authenticate(KerberosAuthenticationHandler.java:360)
        at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:349)

根据报错信息 Specified version of key is not available 发现是由于同一个 HTTP 凭证被导出多遍导致之前的keytab中的凭证失效了,重新生成部署所需的 keytab 即可。
这里的提醒就是不要重复导出相同的凭证, 以防止已经分发使用的keytab中的凭证失效。

Balancer 执行过长导致认证过期

在部署安全认证之后, 我们对hdfs数据进行 balance 就需要预先认证一下再执行, 这样就会遇到我们之前说的认证期限的问题。
这里有两种方式可以解决此问题:

  • 添加外部定时任务重新认证, 刷新凭证缓存, 延迟凭证有效期限。
  • 可以写一个小代码对 balance 的入口 org.apache.hadoop.hdfs.server.balancer.Balancer 进行一点封装,将其封装在一个 doAs 当中, 类似 hue 中的 SudoFsShell 一样的思路

sssd 服务认证异常

sssd 是指我们用于线上登陆认证的一个底层服务,在过去一段时间内经常出现问题退出,导致用户登录动作hang住,进而导致相关任务执行失败。部署Hadoop安全认证之后相关 kerberos 认证也走这个服务,增大了服务异常退出的概率。目前看起来sssd服务问题是由于系统版本过低sssd服务代码有bug导致,解决方案最方便的是升级系统或切换服务到新的机器。

“KDC can’t fulfill requested option while renewing credentials”

应用执行日志偶尔会报如下错误:

2014-03-12 21:30:03593 WARN  security.UserGroupInformation (UserGroupInformation.java:run(794)) - Exception encountered while running the renewal command. Aborting renew thread. org.apache.hadoop.util.Shell$ExitCodeException: kinit(v5): KDC can't fulfill requested option while renewing credentials

表示 UGI的凭证更新线程失败退出了。目前HADOOP-10041 记录了此问题,主要原因是由于凭证无法更新导致, 一般不需要特殊处理。

参考资料

原文出自:https://tech.meituan.com/archives

COS系统的前端演变和发展

美团点评阅读(14)

美团COS:全称美团网核心业务系统部,以持续整合O2O线下资源,共建高效率、低成本的供应链系统,高效推动O2O生态环境建设为业务目标,负责美团网核心业务系统的建设和管理。

COS系统,伴随着美团3年多的发展,前端也积极参与到系统的建设中。 在这几年里,通过优化系统前端环境,改进代码组织结构,丰富公共资源和自动化工具,不断提高了业务响应效率,也在不断努力去逐步缩短系统的前端开发周期,以下简单介绍在这个过程中的一些变化。

第一个COS系统——合同系统

  • 没有独立的静态资源服务,无压缩,采用YUI3的Loader,手动维护依赖关系。
  • 忙着写控件,第一个控件Autocomplete,后来有了Table、Tree、Form、IO等。
  • 线上代码经常不稳定,静态资源地址采用加时间戳的方式来更新。

公共模板部署

由于前端需要支持的业务系统众多,对每个系统而言,都有一些相同的处理逻辑,如前端环境初始化(包括系统的参数配置、YUI部署、UI部署、控件初始化、GA统计源、页面加载时间统计、浏览器升级提醒、问题反馈等)针对每个系统都是一样的,不希望每个系统都要去处理这些逻辑,于是集成了mt-fe.jar到每个后台系统,节约了新开系统的成本。

xxx

模块化道路

  • 模块目录结构扁平化

    所有的模块在目录结构上都是平行的,无区别的。 同时增加了主模块和子模块的概念,并在此基础上定义了统一的加载规则。

  • 模块名称和路径关系约定

    知道一个模块名就可以知道这个模块的代码所在的位置,是否是主模块以及属于某个系统,如:

    crm-module 对应的三个属性应该是:
    {
      path: "/static/module/module.js",
      isMainModule: true,
      app: 'crm'
    }
    deal-module/sub 对应的即:
    {
      path: "/static/module/sub.js",
      isMainModule: false,
      app: 'deal'
    }
    
  • 模块加载机制

    使用YUI3的自动加载,需要给Loader配置一个依赖关系表。最初新增一个模块时,需要在模块定义和Loader配置中都声明该模块的依赖。这样在两个地方维护依赖关系,容易产生不一致,从而带来维护问题。
    为解决上述问题,开发了脚本自动计算所有模块的依赖关系,生成依赖关系表传递给Loader使用,面临的问题是修改模块依赖关系需要运行脚本才能生效,而在开发时更想要所见即所得的效果。于是又针对开发环境,在Loader加载时根据约定的模块名,自动计算出模块的加载路径和类型,从而实现不提前配置依赖关系表也可自动加载。

      一个简单的加载配置
      var metaGroups = {
          "fecore": {
              //发布时自动生成的metaGroups,用于线上环境
              modules: {
                  "moduleA": {
                      path: "moduleA/moduleA.js",
                      requires: ["moduleB", "moduleC"]
                  },
                  ...
              },
              //根据pattern和文件名约定进行自动加载,用于开发环境
              patterns: {
                  "prefix": function(cfg) {
                      cfg.path = "moduleA/moduleA.js";
                      cfg.type = "js";
                      return true;
                  },
                  ...
              }
          }
      };
    
      YUI({
          ...
          groups: metaGroups,
          ...
      }).use('moduleA', function(Y) {
    
      });
    
  • 模块依赖关系梳理

    模块中存在间接依赖,如A依赖B、C,B依赖C,这时在A的依赖关系中只需要声明B就可以工作,如果某天B不需要依赖C了,这时在B中去掉C的成本就变大了。为了解决这类问题,规范了依赖关系声明,并开发工具对源文件进行分析,自动化校验和修改,也计划将该校验加入到各代码仓库的git hooks中。通过该工具的梳理,让开发者能非常明确了解所有模块之间的关系,对宏观掌握当前模块的使用状态也是非常有帮助的。
    xxx

  • 模块的丰富和稳定

    前端支持的项目众多,如何在应用层花最小的代价写代码,是我们一直在思考的问题。 通过不断丰富可复用的组件库、定义统一的UI方案以及提取和整合所有系统的公共模板等来避免重复工作。 目前,除了所有前端公用的代码仓库fe.core外,也为COS系统新增专门的前端代码仓库cos.core,存放和业务相关的模块。同时为了保障模块的稳定性和易用性,开发了模块文档,并进行了测试用例的覆盖。

    • 模块内目录结构完善
      模块中从只包含css、css、tpl文件到包含tests、guide等文件,目前一个完整的模块的目录结构如:

      xxx

    • 组件方面
      从简单的构造器、prototype写对象实现继承到基于YUI3的Widget or Base框架,并在此基础上进行了扩展;不断新增组件和完善组件功能,使其能满足大多数业务需求;对代码进行不断重构,使得组件可以更加稳定。
      我们提倡只要是能被重用的代码,都应该放到相应的公共代码仓库中。

    • UI方面
      不得不说Bootstrap带给web行业的影响是巨大的,特别是针对后台系统。 简洁大气的设计,对于大多数网页元素来讲已经能较好的满足需求,不过针对COS系统,还是有不少需要单独处理的需求,比如各个系统的Layout,一些简单的UI模块和少量交互的组件等,所以在全兼容Bootstrap的基础上做了COS-UI,并对所有的COS系统页面进行了迁移,统一了风格。 也为界面外观定义提供统一标准,降低开发与维护成本。

    • 测试方面
      从使用YUI3提供的YUI Test模块编写单元测试用例,到使用mocha+chai+sinon结合,采用phantomjs进行无界面测试,无缝集成到开发环境,让写测试变的更简单,从而提高了写测试用例的积极性。

    • 文档方面
      从最初专门开发一个应用去为模块编写使用文档,到文档静态化。在完善的模块目录结构基础上,通过梳理文档规范,根据约定自动输出静态化的文档;从静态的demo展示到可以在线修改;从手写的使用说明,到根据YUIDoc生成的注释自动提取文档内容等。使得只需要写最少的内容,即可生成丰富的文档和demo。
      如下是一个简单的构建demo的规范:

        <div class="demo">
            <h1>简单demo</h1>
            <div class="html-content">
                ...
            </div>
            <script>
                ...
            </script>
        </div>
      

      只需要按照上述格式写代码,工具就会自动生成如下静态页面,可以在页面中进行参数修改,便可即时查看到效果。
      demo

COS系统前端分层

用户端和核心业务端的模块都是基于YUI3进行开发,同时在模块化机制的前提下,共用底层库fe.core。 为了更好地针对所有系统业务场景做抽象,开发了专门提供给业务系统使用的模块cos.core。

配置中心会处理所有系统的前端配置,如当前系统环境(开发环境、测试环境、线上环境),YUI的版本号,是否使用Combo服务,是否是调试模式等。

xxx

系统前端开发环境

为了方便系统开发,针对一些平时应用比较普遍的场景开发了自动化的工具,如发布、自动化文档、依赖关系检测、自动化单元测试、全部系统范围内搜索、自动build template等。为了使工具更容易维护,权责更加明晰,在代码组织和管理方面,先后对代码仓库进行了拆分,发布package到内部源,并使用npm来进行包管理,解决了package之间的依赖管理问题。

同时针对各系统提供了一系列的服务,如静态资源、Combo、日志、页面加载性能报表等。 未来还计划开放UI定制,一键开站,动态修改系统配置,在线为某个模块写文档、demo、test,在线管理静态资源等功能。

开发平台旨在希望作为一个窗口,索引与前端有关的所有服务和资源,为开发者提供开发辅助。

xxx

系统发布

  • 系统运行初期,使用Shell脚本处理发布过程(包括资源的压缩、加版本号、计算依赖关系等)。后来由于涉及到的代码仓库增多,发布过程也增加了更多的逻辑,如打包公共模块、修改模板中引用的CSS、图片资源地址等,使得脚本一度维护比较困难,后对脚本进行了拆分。 再后来,考虑到Node的灵活以及社区的活跃,将发布脚本迁移到Node平台,使用Grunt来管理发布任务,同时独立了配置,代码库进行了更细粒度的拆分,使得发布这一过程更加灵活和便于维护。
    xxx

  • 系统发布从后端人工操作到集成到OPS平台一键发布,大大提高了发布效率,减少了bug处理时间。

  • 从使用外部npm源到内部npm源,减少了发布本身的耗时。

以上也为前端cos组在系统建设方面做一个简单总结。非常有幸能在一个重视技术,重视前端的公司里学习成长。回想起这么多的日日夜夜,曾面对每一次技术改造和调整都兴奋不已,会偶尔想方案彻夜难眠,走了很多弯路,开发了很多系统,但每次都能从看似相似其实充满新挑战的系统中获得新的收获。期望每天的点滴进步会让系统开发变得越来越简单高效,Happy Coding!

原文出自:https://tech.meituan.com/archives

基于Flume的美团日志收集系统(二)改进和优化

美团点评阅读(80)

在《基于Flume的美团日志收集系统(一)架构和设计》中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计。在本节中,我们将会讲述在实际部署和使用过程中遇到的问题,对Flume的功能改进和对系统做的优化。

1 Flume的问题总结

在Flume的使用过程中,遇到的主要问题如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常;使用FileChannel又导致IO繁忙的问题;

b. HdfsSink的性能问题:使用HdfsSink向Hdfs写日志,在高峰时间速度较慢;

c. 系统的管理问题:配置升级,模块重启等;

2 Flume的功能改进和优化点

从上面的问题中可以看到,有一些需求是原生Flume无法满足的,因此,基于开源的Flume我们增加了许多功能,修改了一些Bug,并且进行一些调优。下面将对一些主要的方面做一些说明。

2.1 增加Zabbix monitor服务

一方面,Flume本身提供了http, ganglia的监控服务,而我们目前主要使用zabbix做监控。因此,我们为Flume添加了zabbix监控模块,和sa的监控服务无缝融合。

另一方面,净化Flume的metrics。只将我们需要的metrics发送给zabbix,避免 zabbix server造成压力。目前我们最为关心的是Flume能否及时把应用端发送过来的日志写到Hdfs上, 对应关注的metrics为:

  • Source : 接收的event数和处理的event数
  • Channel : Channel中拥堵的event数
  • Sink : 已经处理的event数

2.2 为HdfsSink增加自动创建index功能

首先,我们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink可以读取hadoop配置文件中提供的编码类列表,然后通过配置的方式获取使用何种压缩编码,我们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于以下测试数据:

event大小(Byte)sink.batch-sizehdfs.batchSize压缩格式总数据大小(G)耗时(s)平均events/s压缩后大小(G)
54430010000bz29.1244868331.36
54430010000lzo9.1612273333.49

其次,我们的HdfsSink增加了创建lzo文件后自动创建index功能。Hadoop提供了对lzo创建索引,使得压缩文件是可切分的,这样Hadoop Job可以并行处理数据文件。HdfsSink本身lzo压缩,但写完lzo文件并不会建索引,我们在close文件之后添加了建索引功能。

  /**
   * Rename bucketPath file from .tmp to permanent location.
   */
  private void renameBucket() throws IOException, InterruptedException {
      if(bucketPath.equals(targetPath)) {
              return;
        }

        final Path srcPath = new Path(bucketPath);
        final Path dstPath = new Path(targetPath);

        callWithTimeout(new CallRunner<Object>() {
              @Override
              public Object call() throws Exception {
                if(fileSystem.exists(srcPath)) { // could block
                      LOG.info("Renaming " + srcPath + " to " + dstPath);
                     fileSystem.rename(srcPath, dstPath); // could block

                      //index the dstPath lzo file
                      if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
                              LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
                              lzoIndexer.index(dstPath);
                      }
                }
                return null;
              }
    });
}

2.3 增加HdfsSink的开关

我们在HdfsSink和DualChannel中增加开关,当开关打开的情况下,HdfsSink不再往Hdfs上写数据,并且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。

2.4 增加DualChannel

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

/***
 * putToMemChannel indicate put event to memChannel or fileChannel
 * takeFromMemChannel indicate take event from memChannel or fileChannel
 * */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

void doPut(Event event) {
        if (switchon && putToMemChannel.get()) {
              //往memChannel中写数据
              memTransaction.put(event);

              if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
                putToMemChannel.set(false);
              }
        } else {
              //往fileChannel中写数据
              fileTransaction.put(event);
        }
  }

Event doTake() {
    Event event = null;
    if ( takeFromMemChannel.get() ) {
        //从memChannel中取数据
        event = memTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(false);
        } 
    } else {
        //从fileChannel中取数据
        event = fileTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(true);

            putToMemChannel.set(true);
        } 
    }
    return event;
}

2.5 增加NullChannel

Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。

2.6 增加KafkaSink

为支持向Storm提供实时数据流,我们增加了KafkaSink用来向Kafka写实时数据流。其基本的逻辑如下:

public class KafkaSink extends AbstractSink implements Configurable {
        private String zkConnect;
        private Integer zkTimeout;
        private Integer batchSize;
        private Integer queueSize;
        private String serializerClass;
        private String producerType;
        private String topicPrefix;

        private Producer<String, String> producer;

        public void configure(Context context) {
            //读取配置,并检查配置
        }

        @Override
        public synchronized void start() {
            //初始化producer
        }

        @Override
        public synchronized void stop() {
            //关闭producer
        }

        @Override
        public Status process() throws EventDeliveryException {

            Status status = Status.READY;

            Channel channel = getChannel();
            Transaction tx = channel.getTransaction();
            try {
                    tx.begin();

                    //将日志按category分队列存放
                    Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();

                    //从channel中取batchSize大小的日志,从header中获取category,生成topic,并存放于上述的Map中;

                    //将Map中的数据通过producer发送给kafka 

                   tx.commit();
            } catch (Exception e) {
                    tx.rollback();
                    throw new EventDeliveryException(e);
            } finally {
                tx.close();
            }
            return status;
        }
}

2.7 修复和scribe的兼容问题

Scribed在通过ScribeSource发送数据包给Flume时,大于4096字节的包,会先发送一个Dummy包检查服务器的反应,而Flume的ScribeSource对于logentry.size()=0的包返回TRY_LATER,此时Scribed就认为出错,断开连接。这样循环反复尝试,无法真正发送数据。现在在ScribeSource的Thrift接口中,对size为0的情况返回OK,保证后续正常发送数据。

3. Flume系统调优经验总结

3.1 基础参数调优经验

  • HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
  • 调大MemoryChannel的capacity,尽量利用MemoryChannel快速的处理能力;

  • 调大HdfsSink的batchSize,增加吞吐量,减少hdfs的flush次数;

  • 适当调大HdfsSink的callTimeout,避免不必要的超时错误;

3.2 HdfsSink获取Filename的优化

HdfsSink的path参数指明了日志被写到Hdfs的位置,该参数中可以引用格式化的参数,将日志写到一个动态的目录中。这方便了日志的管理。例如我们可以将日志写到category分类的目录,并且按天和按小时存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H

HdfsS ink中处理每条event时,都要根据配置获取此event应该写入的Hdfs path和filename,默认的获取方法是通过正则表达式替换配置中的变量,获取真实的path和filename。因为此过程是每条event都要做的操作,耗时很长。通过我们的测试,20万条日志,这个操作要耗时6-8s左右。

由于我们目前的path和filename有固定的模式,可以通过字符串拼接获得。而后者比正则匹配快几十倍。拼接定符串的方式,20万条日志的操作只需要几百毫秒。

3.3 HdfsSink的b/m/s优化

在我们初始的设计中,所有的日志都通过一个Channel和一个HdfsSink写到Hdfs上。我们来看一看这样做有什么问题。

首先,我们来看一下HdfsSink在发送数据的逻辑:

//从Channel中取batchSize大小的events
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
    //对每条日志根据category append到相应的bucketWriter上;
    bucketWriter.append(event);
}

for (BucketWriter bucketWriter : writers) {
    //然后对每一个bucketWriter调用相应的flush方法将数据flush到Hdfs上
    bucketWriter.flush();
}

假设我们的系统中有100个category,batchSize大小设置为20万。则每20万条数据,就需要对100个文件进行append或者flush操作。

其次,对于我们的日志来说,基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来说,每20万条可能只包含几条日志,也需要往Hdfs上flush一次。

上述的情况会导致HdfsSink写Hdfs的效率极差。下图是单Channel的情况下每小时的发送量和写hdfs的时间趋势图。

 美团日志收集系统架构

鉴于这种实际应用场景,我们把日志进行了大小归类,分为big, middle和small三类,这样可以有效的避免小日志跟着大日志一起频繁的flush,提升效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。

 美团日志收集系统架构

4 未来发展

目前,Flume日志收集系统提供了一个高可用,高可靠,可扩展的分布式服务,已经有效地支持了美团的日志数据收集工作。

后续,我们将在如下方面继续研究:

  • 日志管理系统:图形化的展示和控制日志收集系统;

  • 跟进社区发展:跟进Flume 1.5的进展,同时回馈社区;

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交