若谷学院
互联网公司技术架构分享

Hadoop安全实践

美团点评阅读(1535)

前言

在2014年初,我们将线上使用的 Hadoop 1.0 集群切换到 Hadoop 2.2.0 稳定版, 与此同时部署了 Hadoop 的安全认证。本文主要介绍在 Hadoop 2.2.0 上部署安全认证的方案调研实施以及相应的解决方法。

背景

集群安全措施相对薄弱

最早部署Hadoop集群时并没有考虑安全问题,随着集群的不断扩大, 各部门对集群的使用需求增加,集群安全问题就显得颇为重要。说到安全问题,一般包括如下方面:

  • 用户认证(Authentication)
    即是对用户身份进行核对, 确认用户即是其声明的身份, 这里包括用户和服务的认证。

  • 用户授权(Authorization)
    即是权限控制,对特定资源, 特定访问用户进行授权或拒绝访问。用户授权是建立再用户认证的基础上, 没有可靠的用户认证谈不上用户授权。

未开启安全认证时,Hadoop 是以客户端提供的用户名作为用户凭证, 一般即是发起任务的Unix 用户。一般线上机器部署服务会采用统一账号,当以统一账号部署集群时,所有执行 Hadoop 任务的用户都是集群的超级管理员,容易发生误操作。即便是以管理员账号部署集群,恶意用户在客户端仍然可以冒充管理员账号执行。

集群整体升级到 hadoop 2.0

2013年10月份 Hadoop 2.2.0 发布,作为 Apache Hadoop 2.X 的 GA 版本。我们考虑将集群整体升级 Hadoop 2.2.0,进入 yarn 时代。与此同时,我们计划在升级过程中一并把集群安全工作做到位,主要基于以下考虑:

  • 与升级工作一样,安全同样是基础工作,把安全搞好会方便我们后续的工作,否则会成为下一个阻碍。
  • 所谓基础工作,就是越往后改动越难的工作,目前不做,将来依赖更多,开展代价更大。

综上,我们的需求是在低版本hadoop升级到Yarn的过程中部署Hadoop安全认证,做好认证之后我们可以在此之上开启适当的权限控制(hdfs, 队列)。

方案调研

在方案调研之前先明确以下安全实践的原则,如下:

  • 做为一个后端服务平台,部署安全的主要目的是防止用户误操作导致的事故(比如误删数据,误操作等)
  • 做安全是为了开放,开放的前提是保证基本的安全,数据安全与平台安全
  • 在保证安全的前提下,尽量简化运维

分析我们遇到的问题,这里我们需要调研:

  • 账号拆分与相应管理方案
  • 开启 Hadoop 安全认证
  • 客户端针对安全认证的相应调整

账号拆分与相应管理方案

集群账号管理

原先我们使用单一账号作为集群管理员,且这一账号为线上统一登录账号, 这存在极大的安全隐患。我们需要使用特殊账号来管理集群。这里涉及的问题是,我们需要几个运维账号呢?
一种简单的做法是使用一个特殊运维账号(比如 hadoop), CDHApache官方 都推荐按服务划分分账号来启动集群:

User:GroupDaemons
hdfs:hadoopNameNode, Secondary NameNode, Checkpoint Node, Backup Node, DataNode
yarn:hadoopResourceManager, NodeManager
mapred:hadoopMapReduce JobHistory Server

考虑到精细化控制可以有效避免误操作,这里我们遵循官方的建议使用多账号。
在从单一运维账号迁移到多个账号部署时,需要考虑相关文件权限问题,包括本地以及hdfs两部分,这可以在安全部署上线时完成相应改动。

用户账号管理

美团很多小组都有使用 Hadoop 来进行大数据处理需求, 故需要一定程度的多租户环境, 这里主要考虑其中的数据和操作的权限问题。hdfs 本身仅提供类 Unix 的权限系统, 默认的组概念也相对鸡肋。鉴于此,在多用户的管理上可以有简单粗暴的方案:

不同组有各自的根目录,使用不同的账号,对组内文件有全部权限。不同组之间相互不能访问数据(除非手动修改)。

在一个集中的数据仓库环境下,又要生产各个部门的统计数据的话,上述策略不够灵活。目前Cloudera 有一个精细化权限控制的解决方案 sentry, 支持 Role based 的权限管理。由于其定制化较高,不方便使用, 故暂未考虑。

开启 Hadoop 安全认证

Hadoop 的安全认证是基于 Kerberos 实现的。 Kerberos 是一个网络身份验证协议,用户只需输入身份验证信息,验证通过获取票据即可访问多个接入 Kerberos 的服务, 机器的单点登录也可以基于此协议完成的。 Hadoop 本身并不创建用户账号,而是使用 Kerberos 协议来进行用户身份验证,从Kerberos凭证中的用户信息获取用户账号, 这样一来跟实际用户运行的账号也无关。

这里我们从 YARN 上的 MR 任务提交过程简单说明一下:
 Yarn 任务提交步骤

  1. 用户执行任务前,先通过KDC认证自己,获取TGT(Ticket Granting Ticket)。KDC是 Kerberos 认证的中心服务,存储用户和服务的认证信息。
  2. 用户通过 TGT 向 KDC 请求访问服务的Ticket, KDC 生成 session key 后一并发给客户端。
  3. 客户端通过 service ticket 向服务认证自己,完成身份认证。
  4. 完成身份认证后客户端向服务请求若干token供后续任务执行认证使用(比如 HDFS NameNode Delegation Token, YARN ResourceManager Delegation Token)
  5. 客户端连同获取到的 token 一并提交任务,后续任务执行使用 token 进行来自服务的认证

从上可以看出,出于性能的考虑,Hadoop 安全认证体系中仅在用户跟服务通信以及各个服务之间通信适用 Kerberos 认证,在用户认证后任务执行,访问服务,读取/写入数据等均采用特定服务(NameNode, Resource Manager)发起访问token,让需求方凭借 token 访问相应服务和数据。这里 token 的传递,认证以及更新不做深入讨论。

关于开启 Hadoop 安全认证, Cloudera 有详细的文档介绍。由于自身环境以及部署运维的考虑,最终的部署方案有些许出入, 一一说明。

Kerberos 部署

Hadoop 安全认证需要一个 Kerberos 集群, 部署 Kerberos 需要部署KDC。 由于我们的环境中使用 freeIPA 进行主机认证相关的权限控制,已经集成 Kerberos 服务, 故不需要另外部署。
Kerberos 相关的运维操作, 比如添加用户,服务,导出keytab,均可以通过 ipa 相关接口来进行。

Container 的选择

从上图可以看出用户发起的任务是在特定的容器(Container)内执行的, 一开始我们考虑使用DefaultContainer 而不是官方推荐的 LinuxContainer, 缺点是对任务之间的物理隔离以及防范恶意任务方面会有缺陷, 不过方便部署,使用LinuxContainer需要在集群各台机器上部署用户账号。
实际测试发现由于MAPREDUCE-5208的引入,在 hadoop 2.2.0 上开启安全认证后无法使用 DefaultContainer
这里不希望对代码有过多定制化的修改,我们考虑还是使用 LinuxContainer, 需要解决一下问题:

  • 用户账号创建
    我们需要在集群内添加所有可能的任务发起用户账号。借助 freeipa 的统一的用户管理 , 我们只需要在 freeipa 上添加相应用户即可。
  • container-executor 和 container-executor.cfg 的部署
    container-executor 作为Yarn 的 container 执行程序,有一系列的权限要求:

    Be owned by root
    Be owned by a group that contains only the user running the YARN daemons
    Be setuid
    Be group readable and executable

    配置 container-executor.cfg 不仅需要是owned by root,且其所在目录同样需要 owned by root。这两者都给自动化部署带来不便,鉴于此部分比较独立且基本不会改变,我们可以将其加入集群机器的 puppet 管理当中。

DataNode 启动方式

CDH 推荐的datanode 的启动方式需要使用低端口并且使用jsvc发布, 在运维方面也不太方便。这里我们通过配置ignore.secure.ports.for.testing=true来启动datanode, 规避这些约束。

客户端针对安全认证的相应调整

集群开启安全认证之后, 依赖集群的客户端(脚本, 服务)都需要做相应修改,不过改动基本无异。大部分服务都已包括对 Kerberos 认证的相应处理, 基本不需要修改。

这里首先得说明一下开启安全认证后的认证方式:

  • 使用密码认证
    使用用户密码通过kinit认证, 获取到的TGT存在本地凭证缓存当中, 供后续访问服务认证使用。一般在交互式访问中使用。
  • 使用 keytab 认证
    用户通过导出的keytab 可以免密码进行用户认证, 后续步骤一致。一般在应用程序中配置使用。

Kerberos 凭证(ticket) 有两个属性, ticket_lifetimerenew_lifetime。其中 ticket_lifetime 表明凭证生效的时限,一般为24小时。在凭证失效前部分凭证可以延期失效时间(即Renewable), renew_lifetime 表明凭证最长可以被延期的时限,一般为一个礼拜。当凭证过期之后,对安全认证的服务的后续访问则会失败。这里第一个问题就是如何处理凭证过期。

凭证过期处理策略

在最早的 Security features for Hadoop 设计中提出这样的假设:

A Hadoop job will run no longer than 7 days (configurable) on a MapReduce cluster or accessing HDFS from the job will fail.

对于一般的任务, 24小时甚至延迟到一周的凭证时限是足够充分的。所以大部分时间我们只需要在执行操作之前使用 kinit 认证一遍,再起后台任务进行周期性凭证更新即可。

while true ; do kinit -R; sleep $((3600 * 6)) ; done &

不过对于需要常驻的访问Hadoop集群的服务来说,上述假设就不成立了。这时候我们可以

  1. 扩大 ticket_lifetimerenew_lifetime 时限
    扩大凭证存活时限可以解决此问题,但由于Kerberos跟我们线上用户登陆认证绑定,会带来安全隐患,故不方便修改。

  2. 定期重新进行kinit 认证更新凭证
    不仅仅是定期延长认证时间,可以直接定期重新认证以延长凭证有限期限。一般我们需要导出 keytab 来进行定期认证的操作。

Hadoop 将 Kerberos 认证部分进行了一定的封装,实际上并不需要那么复杂, 这里重点可以看看 UserGroupInformation 这个类。

UserGroupInformation

UserGroupInformation 这个类在 JAAS 框架上封装了 Hadoop 的用户信息, 更确切地说是对 Subject 做了一层封装。

  UserGroupInformation(Subject subject) {
    this.subject = subject;
    this.user = subject.getPrincipals(User.class).iterator().next();
    this.isKeytab = !subject.getPrivateCredentials(KerberosKey.class).isEmpty();
    this.isKrbTkt = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
  }

JAAS 是 Java 认证和授权服务(Java Authentication and Authorization Service)的缩写, 主要包含以下几个实体:

  • Subject
    Subject 是一个不可继承的实体类,它标志一个请求的来源, 包含相关的凭证标识(Principal) 和 公开和私有的凭据(Credential)。
  • Principal
    凭证标识,认证成功后,一个 Subject 可以被关联多个Principal。
  • Credential
    凭据,有公有凭据以及私有凭据。

JAAS的认证过程如下:

  1. An application instantiates a LoginContext.
  2. The LoginContext consults a Configuration to load all of the LoginModules configured for that application.
  3. The application invokes the LoginContext’s login method.
  4. The login method invokes all of the loaded LoginModules. Each LoginModule attempts to authenticate the subject. Upon success, LoginModules associate relevant Principals and credentials with a Subject object that represents the subject being authenticated.
  5. The LoginContext returns the authentication status to the application.
  6. If authentication succeeded, the application retrieves the Subject from the LoginContext.

需要认证的代码片段可以包装在 doPrivileged 当中, 可以直接使用 Subject.doAs 方法,支持嵌套。

在安全模式下,UGI 支持不同LoginContext 配置, 均是通过 HadoopConfiguration 类动态产生:

  • hadoop-user-kerberos
    使用kerberos缓存凭证登陆的配置, useTicketCache 置为 true.
  • hadoop-keytab-kerberos
    使用keytab登陆的配置, useKeyTab 置为 true.

UGI 当中有多处认证, getLoginUser 方法使用 hadoop-user-kerberos 配置认证:

  1. 通过配置生成 LoginContext
  2. 调用 LoginContext.login 方法完成登陆, 通过 ticket cache 中凭证完成登陆
  3. 判断是否需要其他用户身份(proxy user)执行
  4. HADOOP_TOKEN_FILE_LOCATION 中的 token 加入 Credentials 集合当中
  5. 另起一个线程做周期性的凭证更新 spawnAutoRenewalThreadForUserCreds

步骤5可以看出当我们存在凭证后并不需要主动做周期性地凭证更新。

而 loginUserFromKeytab 方法使用 hadoop-kerberos 配置认证:

  1. 通过配置生成 LoginContext
  2. 调用 LoginContext.login 方法完成登陆, 使用keytab完成登陆

loginUserFromKeytab 没有对凭证做周期的更新, 那怎么保证凭证不会过期呢?

  1. 在访问集群执行相关操作前, 可以调用 checkTGTAndReloginFromKeytab 来尝试更新凭证(实际上是重新登陆了)
  2. 在凭证过期时,创建 IPC 失败会触发调用 reloginFromKeytab 来重新登陆

Client.java

    private synchronized void handleSaslConnectionFailure(
        final int currRetries, final int maxRetries, final Exception ex,
        final Random rand, final UserGroupInformation ugi) throws IOException,
        InterruptedException {
      ugi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws IOException, InterruptedException {
          final short MAX_BACKOFF = 5000;
          closeConnection();
          disposeSasl();
          if (shouldAuthenticateOverKrb()) {
            if (currRetries < maxRetries) {
              if(LOG.isDebugEnabled()) {
                LOG.debug("Exception encountered while connecting to "
                    + "the server : " + ex);
              }
              // try re-login
              if (UserGroupInformation.isLoginKeytabBased()) {
                UserGroupInformation.getLoginUser().reloginFromKeytab();
              } else {
                UserGroupInformation.getLoginUser().reloginFromTicketCache();
              }

可见如果是使用 keytab 认证的话,认证是长期有效的。

从上述代码中可以看到,不论是否是keytab认证,创建IPC失败均会尝试重新登陆。

基于keytab 的Kerberos认证方式

为了让用户免于记忆密码,我们可以考虑导出并交付keytab给相关用户(前提是用户数量可控, 比如是以虚拟用户为单位)。
这样,用户的Hadoop任务认证方式可以有:

  • 直接使用 keytab kinit 之后访问
  • 或者调用 loginUserFromKeytab 完成登录,然后将代码片段包裹在 UGI 的 doAs 方法当中执行

上线部署

确定了部署方案之后, 我们在升级 hadoop 版本的同时完成了安全认证的部署。在部署和使用中我们遇到若干问题,这里一一说明。

JCE 部署

开启安全认证时发现 Kerberos 认证不通过:

Client failed to SASL authenticate: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Checksum failed)]

由于我们部署的Kerberos默认使用 AES-256 加密, 需要在Hadoop环境(集群以及客户端)上安装 Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy File, 否则Kerberos认证不通过。可以通过此 gist 验证改动是否生效。此步骤可以添加到puppet当中。

SNN getimage 返回 NPE

开启安全认证发现 SNN 持续由于 getimage 报错NPE 退出, 相关错误如下。

2013-12-29 23:56:19572 DEBUG org.apache.hadoop.security.authentication.server.AuthenticationFilter: Request [http://XXX.com:50070/getimage?getimage=1&txid=8627&storageInfo=-47:200271
8265:0:CID-3dce02cb-a1c2-4ab8-8b12-f23bbefd7bcc] triggering authentication
2013-12-29 23:56:19580 WARN org.apache.hadoop.security.authentication.server.AuthenticationFilter: Authentication exception: GSSException: Failure unspecified at GSS-API level (Mechanism level: Specified
 version of key is not available (44))
org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: Failure unspecified at GSS-API level (Mechanism level: Specified version of key is not available (44))
        at org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler.authenticate(KerberosAuthenticationHandler.java:360)
        at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:349)

根据报错信息 Specified version of key is not available 发现是由于同一个 HTTP 凭证被导出多遍导致之前的keytab中的凭证失效了,重新生成部署所需的 keytab 即可。
这里的提醒就是不要重复导出相同的凭证, 以防止已经分发使用的keytab中的凭证失效。

Balancer 执行过长导致认证过期

在部署安全认证之后, 我们对hdfs数据进行 balance 就需要预先认证一下再执行, 这样就会遇到我们之前说的认证期限的问题。
这里有两种方式可以解决此问题:

  • 添加外部定时任务重新认证, 刷新凭证缓存, 延迟凭证有效期限。
  • 可以写一个小代码对 balance 的入口 org.apache.hadoop.hdfs.server.balancer.Balancer 进行一点封装,将其封装在一个 doAs 当中, 类似 hue 中的 SudoFsShell 一样的思路

sssd 服务认证异常

sssd 是指我们用于线上登陆认证的一个底层服务,在过去一段时间内经常出现问题退出,导致用户登录动作hang住,进而导致相关任务执行失败。部署Hadoop安全认证之后相关 kerberos 认证也走这个服务,增大了服务异常退出的概率。目前看起来sssd服务问题是由于系统版本过低sssd服务代码有bug导致,解决方案最方便的是升级系统或切换服务到新的机器。

“KDC can’t fulfill requested option while renewing credentials”

应用执行日志偶尔会报如下错误:

2014-03-12 21:30:03593 WARN  security.UserGroupInformation (UserGroupInformation.java:run(794)) - Exception encountered while running the renewal command. Aborting renew thread. org.apache.hadoop.util.Shell$ExitCodeException: kinit(v5): KDC can't fulfill requested option while renewing credentials

表示 UGI的凭证更新线程失败退出了。目前HADOOP-10041 记录了此问题,主要原因是由于凭证无法更新导致, 一般不需要特殊处理。

参考资料

原文出自:https://tech.meituan.com/archives

基于Flume的美团日志收集系统(一)架构和设计

美团点评阅读(1529)

美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流。美团的日志收集系统基于Flume设计和搭建而成。

《基于Flume的美团日志收集系统》将分两部分给读者呈现美团日志收集系统的架构设计和实战经验。

第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计。

第二部分改进和优化,将主要着眼于实际部署和使用过程中遇到的问题,对Flume做的功能修改和优化等。

1 日志收集系统简介

日志收集是大数据的基石。

许多公司的业务平台每天都会产生大量的日志数据。收集业务日志数据,供离线和在线的分析系统使用,正是日志收集系统的要做的事情。高可用性,高可靠性和可扩展性是日志收集系统所具有的基本特征。

目前常用的开源日志收集系统有Flume, Scribe等。Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,目前已经是Apache的一个子项目。Scribe是Facebook开源的日志收集系统,它为日志的分布式收集,统一处理提供一个可扩展的,高容错的简单方案。

2 常用的开源日志收集系统对比

下面将对常见的开源日志收集系统Flume和Scribe的各方面进行对比。对比中Flume将主要采用Apache下的Flume-NG为参考对象。同时,我们将常用的日志收集系统分为三层(Agent层,Collector层和Store层)来进行对比。

对比项Flume-NGScribe
使用语言Javac/c++
容错性Agent和Collector间,Collector和Store间都有容错性,且提供三种级别的可靠性保证;Agent和Collector间, Collector和Store之间有容错性;
负载均衡Agent和Collector间,Collector和Store间有LoadBalance和Failover两种模式
可扩展性
Agent丰富程度提供丰富的Agent,包括avro/thrift socket, text, tail等主要是thrift端口
Store丰富程度可以直接写hdfs, text, console, tcp;写hdfs时支持对text和sequence的压缩;提供buffer, network, file(hdfs, text)等
代码结构系统框架好,模块分明,易于开发代码简单

3 美团日志收集系统架构

美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流。美团的日志收集系统基于Flume设计和搭建而成。目前每天收集和处理约T级别的日志数据。

下图是美团的日志收集系统的整体框架图。

 美团日志收集系统架构

a. 整个系统分为三层:Agent层,Collector层和Store层。其中Agent层每个机器部署一个进程,负责对单机的日志收集工作;Collector层部署在中心服务器上,负责接收Agent层发送的日志,并且将日志根据路由规则写到相应的Store层中;Store层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。

b. Agent到Collector使用LoadBalance策略,将所有的日志均衡地发到所有的Collector上,达到负载均衡的目标,同时并处理单个Collector失效的问题。

c. Collector层的目标主要有三个:SinkHdfs, SinkKafka和SinkBypass。分别提供离线的数据到Hdfs,和提供实时的日志流到Kafka和Bypass。其中SinkHdfs又根据日志量的大小分为SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三个Sink,以提高写入到Hdfs的性能,具体见后面介绍。

d. 对于Store来说,Hdfs负责永久地存储所有日志;Kafka存储最新的7天日志,并给Storm系统提供实时日志流;Bypass负责给其它服务器和应用提供实时日志流。

下图是美团的日志收集系统的模块分解图,详解Agent, Collector和Bypass中的Source, Channel和Sink的关系。

 美团日志收集系统架构

a. 模块命名规则:所有的Source以src开头,所有的Channel以ch开头,所有的Sink以sink开头;

b. Channel统一使用美团开发的DualChannel,具体原因后面详述;对于过滤掉的日志使用NullChannel,具体原因后面详述;

c. 模块之间内部通信统一使用Avro接口;

4 架构设计考虑

下面将从可用性,可靠性,可扩展性和兼容性等方面,对上述的架构做细致的解析。

4.1 可用性(availablity)

对日志收集系统来说,可用性(availablity)指固定周期内系统无故障运行总时间。要想提高系统的可用性,就需要消除系统的单点,提高系统的冗余度。下面来看看美团的日志收集系统在可用性方面的考虑。

4.1.1 Agent死掉

Agent死掉分为两种情况:机器死机或者Agent进程死掉。

对于机器死机的情况来说,由于产生日志的进程也同样会死掉,所以不会再产生新的日志,不存在不提供服务的情况。

对于Agent进程死掉的情况来说,确实会降低系统的可用性。对此,我们有下面三种方式来提高系统的可用性。首先,所有的Agent在supervise的方式下启动,如果进程死掉会被系统立即重启,以提供服务。其次,对所有的Agent进行存活监控,发现Agent死掉立即报警。最后,对于非常重要的日志,建议应用直接将日志写磁盘,Agent使用spooldir的方式获得最新的日志。

4.1.2 Collector死掉

由于中心服务器提供的是对等的且无差别的服务,且Agent访问Collector做了LoadBalance和重试机制。所以当某个Collector无法提供服务时,Agent的重试策略会将数据发送到其它可用的Collector上面。所以整个服务不受影响。

4.1.3 Hdfs正常停机

我们在Collector的HdfsSink中提供了开关选项,可以控制Collector停止写Hdfs,并且将所有的events缓存到FileChannel的功能。

4.1.4 Hdfs异常停机或不可访问

假如Hdfs异常停机或不可访问,此时Collector无法写Hdfs。由于我们使用DualChannel,Collector可以将所收到的events缓存到FileChannel,保存在磁盘上,继续提供服务。当Hdfs恢复服务以后,再将FileChannel中缓存的events再发送到Hdfs上。这种机制类似于Scribe,可以提供较好的容错性。

4.1.5 Collector变慢或者Agent/Collector网络变慢

如果Collector处理速度变慢(比如机器load过高)或者Agent/Collector之间的网络变慢,可能导致Agent发送到Collector的速度变慢。同样的,对于此种情况,我们在Agent端使用DualChannel,Agent可以将收到的events缓存到FileChannel,保存在磁盘上,继续提供服务。当Collector恢复服务以后,再将FileChannel中缓存的events再发送给Collector。

4.1.6 Hdfs变慢

当Hadoop上的任务较多且有大量的读写操作时,Hdfs的读写数据往往变的很慢。由于每天,每周都有高峰使用期,所以这种情况非常普遍。

对于Hdfs变慢的问题,我们同样使用DualChannel来解决。当Hdfs写入较快时,所有的events只经过MemChannel传递数据,减少磁盘IO,获得较高性能。当Hdfs写入较慢时,所有的events只经过FileChannel传递数据,有一个较大的数据缓存空间。

4.2 可靠性(reliability)

对日志收集系统来说,可靠性(reliability)是指Flume在数据流的传输过程中,保证events的可靠传递。

对Flume来说,所有的events都被保存在Agent的Channel中,然后被发送到数据流中的下一个Agent或者最终的存储服务中。那么一个Agent的Channel中的events什么时候被删除呢?当且仅当它们被保存到下一个Agent的Channel中或者被保存到最终的存储服务中。这就是Flume提供数据流中点到点的可靠性保证的最基本的单跳消息传递语义。

那么Flume是如何做到上述最基本的消息传递语义呢?

首先,Agent间的事务交换。Flume使用事务的办法来保证event的可靠传递。Source和Sink分别被封装在事务中,这些事务由保存event的存储提供或者由Channel提供。这就保证了event在数据流的点对点传输中是可靠的。在多级数据流中,如下图,上一级的Sink和下一级的Source都被包含在事务中,保证数据可靠地从一个Channel到另一个Channel转移。

 美团日志收集系统架构

其次,数据流中 Channel的持久性。Flume中MemoryChannel是可能丢失数据的(当Agent死掉时),而FileChannel是持久性的,提供类似mysql的日志机制,保证数据不丢失。

4.3 可扩展性(scalability)

对日志收集系统来说,可扩展性(scalability)是指系统能够线性扩展。当日志量增大时,系统能够以简单的增加机器来达到线性扩容的目的。

对于基于Flume的日志收集系统来说,需要在设计的每一层,都可以做到线性扩展地提供服务。下面将对每一层的可扩展性做相应的说明。

4.3.1 Agent层

对于Agent这一层来说,每个机器部署一个Agent,可以水平扩展,不受限制。一个方面,Agent收集日志的能力受限于机器的性能,正常情况下一个Agent可以为单机提供足够服务。另一方面,如果机器比较多,可能受限于后端Collector提供的服务,但Agent到Collector是有Load Balance机制,使得Collector可以线性扩展提高能力。

4.3.2 Collector层

对于Collector这一层,Agent到Collector是有Load Balance机制,并且Collector提供无差别服务,所以可以线性扩展。其性能主要受限于Store层提供的能力。

4.3.3 Store层

对于Store这一层来说,Hdfs和Kafka都是分布式系统,可以做到线性扩展。Bypass属于临时的应用,只对应于某一类日志,性能不是瓶颈。

4.4 Channel的选择

Flume1.4.0中,其官方提供常用的MemoryChannel和FileChannel供大家选择。其优劣如下:

  • MemoryChannel: 所有的events被保存在内存中。优点是高吞吐。缺点是容量有限并且Agent死掉时会丢失内存中的数据。

  • FileChannel: 所有的events被保存在文件中。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。

上述两种Channel,优缺点相反,分别有自己适合的场景。然而,对于大部分应用来说,我们希望Channel可以同提供高吞吐和大缓存。基于此,我们开发了DualChannel。

  • DualChannel:基于 MemoryChannel和 FileChannel开发。当堆积在Channel中的events数小于阈值时,所有的events被保存在MemoryChannel中,Sink从MemoryChannel中读取数据; 当堆积在Channel中的events数大于阈值时, 所有的events被自动存放在FileChannel中,Sink从FileChannel中读取数据。这样当系统正常运行时,我们可以使用MemoryChannel的高吞吐特性;当系统有异常时,我们可以利用FileChannel的大缓存的特性。

4.5 和scribe兼容

在设计之初,我们就要求每类日志都有一个category相对应,并且Flume的Agent提供AvroSource和ScribeSource两种服务。这将保持和之前的Scribe相对应,减少业务的更改成本。

4.6 权限控制

在目前的日志收集系统中,我们只使用最简单的权限控制。只有设定的category才可以进入到存储系统。所以目前的权限控制就是category过滤。

如果权限控制放在Agent端,优势是可以较好地控制垃圾数据在系统中流转。但劣势是配置修改麻烦,每增加一个日志就需要重启或者重载Agent的配置。

如果权限控制放在Collector端,优势是方便进行配置的修改和加载。劣势是部分没有注册的数据可能在Agent/Collector之间传输。

考虑到Agent/Collector之间的日志传输并非系统瓶颈,且目前日志收集属内部系统,安全问题属于次要问题,所以选择采用Collector端控制。

4.7 提供实时流

美团的部分业务,如实时推荐,反爬虫服务等服务,需要处理实时的数据流。因此我们希望Flume能够导出一份实时流给Kafka/Storm系统。

一个非常重要的要求是实时数据流不应该受到其它Sink的速度影响,保证实时数据流的速度。这一点,我们是通过Collector中设置不同的Channel进行隔离,并且DualChannel的大容量保证了日志的处理不受Sink的影响。

5 系统监控

对于一个大型复杂系统来说,监控是必不可少的部分。设计合理的监控,可以对异常情况及时发现,只要有一部手机,就可以知道系统是否正常运作。对于美团的日志收集系统,我们建立了多维度的监控,防止未知的异常发生。

5.1 发送速度,拥堵情况,写Hdfs速度

通过发送给zabbix的数据,我们可以绘制出发送数量、拥堵情况和写Hdfs速度的图表,对于超预期的拥堵,我们会报警出来查找原因。

下面是Flume Collector HdfsSink写数据到Hdfs的速度截图:

 美团日志收集系统架构

下面是Flume Collector的FileChannel中拥堵的events数据量截图:

 美团日志收集系统架构

5.2 flume写hfds状态的监控

Flume写入Hdfs会先生成tmp文件,对于特别重要的日志,我们会每15分钟左右检查一下各个Collector是否都产生了tmp文件,对于没有正常产生tmp文件的Collector和日志我们需要检查是否有异常。这样可以及时发现Flume和日志的异常.

5.3 日志大小异常监控

对于重要的日志,我们会每个小时都监控日志大小周同比是否有较大波动,并给予提醒,这个报警有效的发现了异常的日志,且多次发现了应用方日志发送的异常,及时给予了对方反馈,帮助他们及早修复自身系统的异常。

通过上述的讲解,我们可以看到,基于Flume的美团日志收集系统已经是具备高可用性,高可靠性,可扩展等特性的分布式服务。

原文出自:https://tech.meituan.com/archives

基于Flume的美团日志收集系统(二)改进和优化

美团点评阅读(1521)

在《基于Flume的美团日志收集系统(一)架构和设计》中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计。在本节中,我们将会讲述在实际部署和使用过程中遇到的问题,对Flume的功能改进和对系统做的优化。

1 Flume的问题总结

在Flume的使用过程中,遇到的主要问题如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常;使用FileChannel又导致IO繁忙的问题;

b. HdfsSink的性能问题:使用HdfsSink向Hdfs写日志,在高峰时间速度较慢;

c. 系统的管理问题:配置升级,模块重启等;

2 Flume的功能改进和优化点

从上面的问题中可以看到,有一些需求是原生Flume无法满足的,因此,基于开源的Flume我们增加了许多功能,修改了一些Bug,并且进行一些调优。下面将对一些主要的方面做一些说明。

2.1 增加Zabbix monitor服务

一方面,Flume本身提供了http, ganglia的监控服务,而我们目前主要使用zabbix做监控。因此,我们为Flume添加了zabbix监控模块,和sa的监控服务无缝融合。

另一方面,净化Flume的metrics。只将我们需要的metrics发送给zabbix,避免 zabbix server造成压力。目前我们最为关心的是Flume能否及时把应用端发送过来的日志写到Hdfs上, 对应关注的metrics为:

  • Source : 接收的event数和处理的event数
  • Channel : Channel中拥堵的event数
  • Sink : 已经处理的event数

2.2 为HdfsSink增加自动创建index功能

首先,我们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink可以读取hadoop配置文件中提供的编码类列表,然后通过配置的方式获取使用何种压缩编码,我们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于以下测试数据:

event大小(Byte)sink.batch-sizehdfs.batchSize压缩格式总数据大小(G)耗时(s)平均events/s压缩后大小(G)
54430010000bz29.1244868331.36
54430010000lzo9.1612273333.49

其次,我们的HdfsSink增加了创建lzo文件后自动创建index功能。Hadoop提供了对lzo创建索引,使得压缩文件是可切分的,这样Hadoop Job可以并行处理数据文件。HdfsSink本身lzo压缩,但写完lzo文件并不会建索引,我们在close文件之后添加了建索引功能。

  /**
   * Rename bucketPath file from .tmp to permanent location.
   */
  private void renameBucket() throws IOException, InterruptedException {
      if(bucketPath.equals(targetPath)) {
              return;
        }

        final Path srcPath = new Path(bucketPath);
        final Path dstPath = new Path(targetPath);

        callWithTimeout(new CallRunner<Object>() {
              @Override
              public Object call() throws Exception {
                if(fileSystem.exists(srcPath)) { // could block
                      LOG.info("Renaming " + srcPath + " to " + dstPath);
                     fileSystem.rename(srcPath, dstPath); // could block

                      //index the dstPath lzo file
                      if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
                              LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
                              lzoIndexer.index(dstPath);
                      }
                }
                return null;
              }
    });
}

2.3 增加HdfsSink的开关

我们在HdfsSink和DualChannel中增加开关,当开关打开的情况下,HdfsSink不再往Hdfs上写数据,并且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。

2.4 增加DualChannel

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

/***
 * putToMemChannel indicate put event to memChannel or fileChannel
 * takeFromMemChannel indicate take event from memChannel or fileChannel
 * */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

void doPut(Event event) {
        if (switchon && putToMemChannel.get()) {
              //往memChannel中写数据
              memTransaction.put(event);

              if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
                putToMemChannel.set(false);
              }
        } else {
              //往fileChannel中写数据
              fileTransaction.put(event);
        }
  }

Event doTake() {
    Event event = null;
    if ( takeFromMemChannel.get() ) {
        //从memChannel中取数据
        event = memTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(false);
        } 
    } else {
        //从fileChannel中取数据
        event = fileTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(true);

            putToMemChannel.set(true);
        } 
    }
    return event;
}

2.5 增加NullChannel

Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。

2.6 增加KafkaSink

为支持向Storm提供实时数据流,我们增加了KafkaSink用来向Kafka写实时数据流。其基本的逻辑如下:

public class KafkaSink extends AbstractSink implements Configurable {
        private String zkConnect;
        private Integer zkTimeout;
        private Integer batchSize;
        private Integer queueSize;
        private String serializerClass;
        private String producerType;
        private String topicPrefix;

        private Producer<String, String> producer;

        public void configure(Context context) {
            //读取配置,并检查配置
        }

        @Override
        public synchronized void start() {
            //初始化producer
        }

        @Override
        public synchronized void stop() {
            //关闭producer
        }

        @Override
        public Status process() throws EventDeliveryException {

            Status status = Status.READY;

            Channel channel = getChannel();
            Transaction tx = channel.getTransaction();
            try {
                    tx.begin();

                    //将日志按category分队列存放
                    Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();

                    //从channel中取batchSize大小的日志,从header中获取category,生成topic,并存放于上述的Map中;

                    //将Map中的数据通过producer发送给kafka 

                   tx.commit();
            } catch (Exception e) {
                    tx.rollback();
                    throw new EventDeliveryException(e);
            } finally {
                tx.close();
            }
            return status;
        }
}

2.7 修复和scribe的兼容问题

Scribed在通过ScribeSource发送数据包给Flume时,大于4096字节的包,会先发送一个Dummy包检查服务器的反应,而Flume的ScribeSource对于logentry.size()=0的包返回TRY_LATER,此时Scribed就认为出错,断开连接。这样循环反复尝试,无法真正发送数据。现在在ScribeSource的Thrift接口中,对size为0的情况返回OK,保证后续正常发送数据。

3. Flume系统调优经验总结

3.1 基础参数调优经验

  • HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
  • 调大MemoryChannel的capacity,尽量利用MemoryChannel快速的处理能力;

  • 调大HdfsSink的batchSize,增加吞吐量,减少hdfs的flush次数;

  • 适当调大HdfsSink的callTimeout,避免不必要的超时错误;

3.2 HdfsSink获取Filename的优化

HdfsSink的path参数指明了日志被写到Hdfs的位置,该参数中可以引用格式化的参数,将日志写到一个动态的目录中。这方便了日志的管理。例如我们可以将日志写到category分类的目录,并且按天和按小时存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H

HdfsS ink中处理每条event时,都要根据配置获取此event应该写入的Hdfs path和filename,默认的获取方法是通过正则表达式替换配置中的变量,获取真实的path和filename。因为此过程是每条event都要做的操作,耗时很长。通过我们的测试,20万条日志,这个操作要耗时6-8s左右。

由于我们目前的path和filename有固定的模式,可以通过字符串拼接获得。而后者比正则匹配快几十倍。拼接定符串的方式,20万条日志的操作只需要几百毫秒。

3.3 HdfsSink的b/m/s优化

在我们初始的设计中,所有的日志都通过一个Channel和一个HdfsSink写到Hdfs上。我们来看一看这样做有什么问题。

首先,我们来看一下HdfsSink在发送数据的逻辑:

//从Channel中取batchSize大小的events
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
    //对每条日志根据category append到相应的bucketWriter上;
    bucketWriter.append(event);
}

for (BucketWriter bucketWriter : writers) {
    //然后对每一个bucketWriter调用相应的flush方法将数据flush到Hdfs上
    bucketWriter.flush();
}

假设我们的系统中有100个category,batchSize大小设置为20万。则每20万条数据,就需要对100个文件进行append或者flush操作。

其次,对于我们的日志来说,基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来说,每20万条可能只包含几条日志,也需要往Hdfs上flush一次。

上述的情况会导致HdfsSink写Hdfs的效率极差。下图是单Channel的情况下每小时的发送量和写hdfs的时间趋势图。

 美团日志收集系统架构

鉴于这种实际应用场景,我们把日志进行了大小归类,分为big, middle和small三类,这样可以有效的避免小日志跟着大日志一起频繁的flush,提升效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。

 美团日志收集系统架构

4 未来发展

目前,Flume日志收集系统提供了一个高可用,高可靠,可扩展的分布式服务,已经有效地支持了美团的日志数据收集工作。

后续,我们将在如下方面继续研究:

  • 日志管理系统:图形化的展示和控制日志收集系统;

  • 跟进社区发展:跟进Flume 1.5的进展,同时回馈社区;

原文出自:https://tech.meituan.com/archives

美团外卖客户端高可用建设体系

美团点评阅读(1822)

背景

美团外卖从2013年11月开始起步,经过数年的高速发展,一直在不断地刷新着记录。2018年5月19日,日订单量峰值突破2000万单,已经成为全球规模最大的外卖平台。业务的快速发展对系统稳定性提出了更高的要求,如何为线上用户提供高稳定的服务体验,保障全链路业务和系统高可用运行,不仅需要后端服务支持,更需要在端上提供全面的技术保障。而相对服务端而言,客户端运行环境千差万别,不可控因素多,面对突发问题应急能力差。因此,构建客户端的高可用建设体系,保障服务稳定高可用,不仅是对工程师的技术挑战,也是外卖平台的核心竞争力之一。

高可用建设体系的思路

一个设计良好的大型客户端系统往往是由一系列各自独立的小组共同开发完成的,每一个小组都应当具有明确定义的的职责划分。各业务模块之间推行“松耦合”开发模式,让业务模块拥有隔离式变更的能力,是一种可以同时提升开发灵活性和系统健壮性的有效手段。这是美团外卖整体的业务架构,整体上以商品交易链路(门店召回,商品展示,交易)为核心方向进行建设,局部上依据业务特点和团队分工分成多个可独立运维单元单独维护。可独立运维单元的简单性是可靠性的前提条件,这使得我们能够持续关注功能迭代,不断完成相关的工程开发任务。

图片1

我们将问题依照生命周期划分为三个阶段:发现、定位、解决,围绕这三个阶段的持续建设,构成了美团外卖高可用建设体系的核心。

美团外卖质量保障体系全景图

这是美团外卖客户端整体质量体系全景图。整体思路:监控报警,日志体系,容灾。

图片2

通过采集业务稳定性,基础能力稳定性,性能稳定性三大类指标数据并上报,衡量客户端系统质量的标准得以完善;通过设立基线,应用特定业务模型对这一系列指标进行监控报警,客户端具备了分钟级感知核心链路稳定性的能力;而通过搭建日志体系,整个系统有了提取关键线索能力,多维度快速定位问题。当问题一旦定位,我们就能通过美团外卖的线上运维规范进行容灾操作:降级,切换通道或限流,从而保证整体的核心链路稳定性。

监控&报警

监控系统,处于整个服务可靠度层级模型的最底层,是运维一个可靠的稳定系统必不可少的重要组成部分。为了保障全链路业务和系统高可用运行,需要在用户感知问题之前发现系统中存在的异常,离开了监控系统,我们就没有能力分辨客户端是不是在正常提供服务。

图片3

按照监控的领域方向,可以分成系统监控与业务监控。
系统监控,主要用于基础能力如端到端成功率,服务响应时长,网络流量,硬件性能等相关的监控。系统监控侧重在无业务侵入和定制系统级别的监控,更多侧重在业务应用的底层,多属于单系统级别的监控。
业务监控,侧重在某个时间区间,业务的运行情况分析。业务监控系统构建于系统监控之上,可以基于系统监控的数据指标计算,并基于特定的业务介入,实现多系统之间的数据联合与分析,并根据相应的业务模型,提供实时的业务监控与告警。按照业务监控的时效性,可以继续将其细分成实时业务监控与离线业务监控。

  • 实时业务监控,通过实时的数据采集分析,帮助快速发现及定位线上问题,提供告警机制及介入响应(人工或系统)途径,帮助避免发生系统故障。
  • 离线的业务监控,对一定时间段收集的数据进行数据挖掘、聚合、分析,推断出系统业务可能存在的问题,帮助进行业务上的重新优化或改进的监控。

美团外卖的业务监控,大部分属于实时业务监控。借助美团统一的系统监控建设基础,美团外卖联合公司其他部门将部分监控基础设施进行了改造、共建和整合复用,并打通形成闭环(监控,日志,回捞),我们构建了特定符合外卖业务流程的实时业务监控; 而离线的业务监控,主要通过用户行为的统计与业务数据的挖掘分析,来帮助产品设计,运营策略行为等产生影响,目前这部分监控主要由美团外卖数据组提供服务。值得特别说明的是单纯的信息汇总展示,无需或无法立即做出介入动作的业务监控,可以称之为业务分析,如特定区域的活动消费情况、区域订单数量、特定路径转换率、曝光点击率等,除非这些数据用来决策系统实时状态健康情况,帮助产生系统维护行为,否则这部分监控由离线来处理更合适。

图片4

我们把客户端稳定性指标分为3类维度:业务稳定性指标,基础能力稳定性指标,性能稳定性指标。对不同的指标,我们采用不同的采集方案进行提取上报,汇总到不同系统;在设定完指标后,我们就可以制定基线,并依照特定的业务模型制定报警策略。美团外卖客户端拥有超过40项度量质量指标,其中25项指标支持分钟级别报警。报警通道依据紧急程度支持邮件,IM和短信三条通道。因此,我们团队具备及时发现影响核心链路稳定性的关键指标变化能力。

一个完善的监控报警系统是非常复杂的,因此在设计时一定要追求简化。以下是《Site Reliability Engineering: How Google Runs Production Systems》一书中提到的告警设置原则:

最能反映真实故障的规则应该可预测性强,非常可靠,并且越简单越好
不常用的数据采集,汇总以及告警配置应该定时清除(某些SRE团队的标准是一季度未使用即删除)
没有暴露给任何监控后台、告警规则的采集数据指标应该定时清除

通过监控&报警系统,2017年下半年美团外卖客户端团队共发现影响核心链路稳定性超过20起问题:包括爬虫、流量、运营商403问题、性能问题等。目前,所有问题均已全部改造完毕。

日志体系

监控系统的一个重要特征是生产紧急告警。一旦出现故障,需要有人来调查这项告警,以决定目前是否存在真实故障,是否需要采取特定方法缓解故障,直至查出导致故障的问题根源。

简单定位和深入调试的过程必须要保持非常简单,必须能够被团队中任何一个人所理解。日志体系,在简化这一过程中起到了决定性作用。

图片5

美团外卖的日志体系总体分为3大类:即全量日志系统,个体日志系统,异常日志系统。全量日志系统,主要负责采集整体性指标,如网络可用性,埋点可用性,我们可以通过他了解到系统整体大盘,了解整体波动,确定问题影响范围;异常日志系统,主要采集异常指标,如大图问题,分享失败,定位失败等,我们通过他可以迅速获取异常上下文信息,分析解决问题;而个体日志系统,则用于提取个体用户的关键信息,从而针对性的分析特定客诉问题。这三类日志,构成了完整的客户端日志体系。

图片2

日志的一个典型使用场景是处理单点客诉问题,解决系统潜在隐患。个体日志系统,用于简化工程师提取关键线索步骤,提升定位分析问题效率。在这一领域,美团外卖使用的是点评平台开发的Logan服务。作为美团移动端底层的基础日志库,Logan接入了集团众多日志系统,例如端到端日志、用户行为日志、代码级日志、崩溃日志等,并且这些日志全部都是本地存储,且有多重加密机制和严格的权限审核机制,在处理用户客诉时才对数据进行回捞和分析,保证用户隐私安全。

通过设计和实施美团外卖核心链路日志方案,我们打通了用户交易流程中各系统如订单,用户中心,Crash平台与Push后台之间的底层数据同步;通过输出标准问题分析手册,针对常见个体问题的分析和处理得以标准化;通过制定日志捞取SOP并定期演练,线上追溯能力大幅提升,日常客诉绝大部分可在30分钟内定位原因。在这一过程中,通过个体暴露出影响核心链路稳定性的问题也均已全部改进/修复。

故障排查是运维大型系统的一项关键技能。采用系统化的工具和手段而不仅仅依靠经验甚至运气,这项技能是可以自我学习,也可以内部进行传授。

容灾备份

针对不同级别的服务,应该采取不同的手段进行有效止损。非核心依赖,通过降级向用户提供可伸缩的服务;而核心依赖,采用多通道方式进行依赖备份容灾保证交易路径链路的高可用;异常流量,通过多维度限流,最大限度保证业务可用性的同时,给予用户良好的体验。总结成三点,即:非核心依赖降级、核心依赖备份、过载保护限流。接下来我们分别来阐述这三方面。

降级

图片6

在这里选取美团外卖客户端整体系统结构关系图来介绍非核心依赖降级建设概览。图上中间红色部分是核心关键节点,即外卖业务的核心链路:定位,商家召回,商品展示,下单;蓝色部分,是核心链路依赖的关键服务;黄色部分,是可降级服务。我们通过梳理依赖关系,改造前后端通讯协议,实现了客户端非核心依赖可降级;而后端服务,通过各级缓存,屏蔽隔离策略,实现了业务模块内部可降级,业务之间可降级。这构成了美团外卖客户端整体的降级体系。

右边则是美团外卖客户端业务/技术降级开关流程图。通过推拉结合,缓存更新策略,我们能够分钟级别同步降级配置,快速止损。

目前,美团外卖客户端有超过20项业务/能力支持降级。通过有效降级,我们避开了1次S2级事故,多次S3、S4级事故。此外,降级开关整体方案产出SDK horn,推广至集团酒旅、金融等其他核心业务应用。

备份

核心依赖备份建设上,在此重点介绍美团外卖多网络通道。网络通道,作为客户端的最核心依赖,却是整个全链路体系最不可控的部分,经常出现问题:网络劫持,运营商故障,甚至光纤被物理挖断等大大小小的故障严重影响了核心链路的稳定性。因此,治理网络问题,必须要建设可靠的多通道备份。

图片7

这是美团外卖多网络通道备份示意图。美团外卖客户端拥有Shark、HTTP、HTTPS、HTTP DNS等四条网络通道:整体网络以Shark长连通道为主通道,其余三条通道作为备份通道。配合完备的开关切换流程,可以在网络指标发生骤降时,实现分钟级别的分城市网络通道切换。而通过制定故障应急SOP并不断演练,提升了我们解决问题的能力和速度,有效应对各类网络异常。我们的网络通道开关思路也输出至集团其他部门,有效支持了业务发展。

限流

服务过载是另一类典型的事故。究其原因大部分情况下都是由于少数调用方调用的少数接口性能很差,导致对应服务的性能恶化。若调用端缺乏有效降级容错,在某些正常情况下能够降低错误率的手段,如请求失败后重试,反而会让服务进一步性能恶化,甚至影响本来正常的服务调用。

美团外卖业务在高峰期订单量已达到了相当高的规模量级,业务系统也及其复杂。根据以往经验,在业务高峰期,一旦出现异常流量疯狂增长从而导致服务器宕机,则损失不可估量。

因此,美团外卖前后端联合开发了一套“流量控制系统”,对流量实施实时控制。既能日常保证业务系统稳定运转,也能在业务系统出现问题的时候提供一套优雅的降级方案,最大限度保证业务的可用性,在将损失降到最低的前提下,给予用户良好的体验。

图片8

整套系统,后端服务负责识别打标分类,通过统一的协议告诉前端所标识类别;而前端,通过多级流控检查,对不同流量进行区分处理:弹验证码,或排队等待,或直接处理,或直接丢弃。
面对不同场景,系统支持多级流控方案,可有效拦截系统过载流量,防止系统雪崩。此外,整套系统拥有分接口流控监控能力,可对流控效果进行监控,及时发现系统异常。整套方案在数次异常流量增长的故障中,经受住了考验。

发布

随着外卖业务的发展,美团外卖的用户量和订单量已经达到了相当的量级,在线直接全量发布版本/功能影响范围大,风险高。
版本灰度和功能灰度是一种能够平滑过渡的发布方式:即在线上进行A/B实验,让一部分用户继续使用产品(特性)A,另一部分用户开始使用产品(特性)B。如果各项指标平稳正常,结果符合预期,则扩大范围,将所有用户都迁移到B上来,否则回滚。灰度发布可以保证系统的稳定,在初试阶段就可以发现问题,修复问题,调整策略,保证影响范围不被扩散。

美团外卖客户端在版本灰度及功能灰度已较为完善。
版本灰度 iOS采用苹果官方提供的分阶段发布方式,Android则采用美团自研的EVA包管理后台进行发布。这两类发布均支持逐步放量的分发方式。
功能灰度 功能发布开关配置系统依据用户特征维度(如城市,用户ID)发布,并且整个配置系统有测试和线上两套不同环境,配合固定的上线窗口,保证上线的规范性。
对应的,相应的监控基础设施也支持分用户特征维度(如城市,用户ID)监控,避免了那些无法在整体大盘体现的灰度异常。此外,无论版本灰度或功能灰度,我们均有相应最小灰度周期和回滚机制,保证整个灰度发布过程可控,最小化问题影响。

线上运维

图片9

在故障来临时如何应对,是整个质量保障体系中最关键的环节。没有人天生就能完美的处理紧急情况,面对问题,恰当的处理需要平时不断的演练。
围绕问题的生命周期,即发现、定位、解决(预防),美团外卖客户端团队组建了一套完备的处理流程和规范来应对影响链路稳定性的各类线上问题。整体思路:建立规范,提前建设,有效应对,事后总结。在不同阶段用不同方式解决不同问题,事前确定完整的事故流程管理策略,并确保平稳实施,经常演练,问题的平均恢复时间大大降低,美团外卖核心链路的高稳定性才能够得以保障。

未来展望

当前美团外卖业务仍然处于快速增长期。伴随着业务的发展,背后支持业务的技术系统也日趋复杂。在美团外卖客户端高可用体系建设过程中,我们希望能够通过一套智能化运维系统,帮助工程师快速、准确的识别核心链路各子系统异常,发现问题根源,并自动执行对应的异常解决预案,进一步缩短服务恢复时间,从而避免或减少线上事故影响。

诚然,业界关于自动化运维的探索有很多,但多数都集中在后台服务领域,前端方向成果较少。我们外卖技术团队目前也在同步的探索中,正处于基础性建设阶段,欢迎更多业界同行跟我们一起讨论、切磋。

参考资料

  1. Site Reliability Engineering: How Google Runs Production Systems
  2. 美团点评移动端基础日志库——Logan
  3. 美团点评移动网络优化实践

作者简介

陈航,美团高级技术专家。2015年加入美团,目前负责美团外卖iOS团队,对移动端架构演进,监控报警备份容灾,移动端线上运维等领域有深刻理解。

富强,美团资深工程师。2015年加入美团,是外卖iOS的早期开发者之一,目前作为美团外卖iOS基础设施小组负责人,负责外卖基础设施及广告运营相关业务。

徐宏,美团高级工程师。2016年加入美团,目前作为外卖iOS团队主力开发,负责移动端APM性能监控,高可用基础设施支撑相关推进工作。

招聘

美团外卖长期招聘iOS、Android、FE高级/资深工程师和技术专家,可Base在北京、上海、成都,欢迎有兴趣的同学将简历发送至chenhang03#meituan.com。

原文出自:https://tech.meituan.com/archives

大众点评账号业务高可用进阶之路

美团点评阅读(1794)

引言

在任何一家互联网公司,不管其主营业务是什么,都会有一套自己的账号体系。账号既是公司所有业务发展留下的最宝贵资产,它可以用来衡量业务指标,例如日活、月活、留存等,同时也给不同业务线提供了大量潜在用户,业务可以基于账号来做用户画像,制定各自的发展路径。因此,账号服务的重要性不言而喻,同时美团业务飞速发展,对账号业务的可用性要求也越来越高。本文将分享一些我们在高可用探索中的实践。

衡量一个系统的可用性有两个指标:1. MTBF (Mean Time Between Failure) 即平均多长时间不出故障;2. MTTR (Mean Time To Recovery) 即出故障后的平均恢复时间。通过这两个指标可以计算出可用性,也就是我们大家比较熟悉的“几个9”。

可用性公式

因此提升系统的可用性,就得从这两个指标入手,要么降低故障恢复的时间,要么延长不出故障的时间。

1. 业务监控

要降低故障恢复的时间,首先得尽早的发现故障,然后才能解决故障,这些故障包括系统内和系统外的,这就需要依赖业务监控系统。

业务监控不同于其他监控系统,业务监控关注的是各个业务指标是否正常,比如账号的登录曲线。大众点评登录入口有很多,从终端上分有App、PC、M站,从登录类型上分有密码登录、快捷登录、第三方登录(微信/QQ/微博)、小程序登录等。需要监控的维度有登录总数、成功数、失败分类、用户地区、App版本号、浏览器类型、登录来源Referer、服务所在机房等等。业务监控最能从直观上告诉我们系统的运行状况。

由于业务监控的维度很多很杂,有时还要增加新的监控维度,并且告警分析需要频繁聚合不同维度的数据,因此我们采用ElasticSearch作为日志存储。整体架构如下图:

每条监控都会根据过去的业务曲线计算出一条基线(见下图),用来跟当前数据做对比,超出设定的阈值后就会触发告警。

监控曲线图

每次收到告警,我们都要去找出背后的原因,如果是流量涨了,是有活动了还是被刷了?如果流量跌了,是日志延时了还是服务出问题了?另外值得重视的是告警的频次,如果告警太多就会稀释大家的警惕性。我们曾经踩过一次坑,因为告警太多就把告警关了,结果就在关告警的这段时间业务出问题了,我们没有及时发现。为了提高每条告警的定位速度,我们在每条告警后面加上维度分析。如下图(非真实数据),告警里直接给出分析结果。

监控告警分析图

其实业务监控也从侧面反映出一个系统的可用性,所谓服务未动,监控先行。

2. 柔性可用

柔性可用的目的是延长不出故障的时间,当业务依赖的下游服务出故障时不影响自身的核心功能或服务。账号对上层业务提供的鉴权和查询服务即核心服务,这些服务的QPS非常高,业务方对它们的可用性要求也很高,别说是服务故障,就连任何一点抖动都是不能接受的。对此我们先从整体架构上把服务拆分,其次在服务内对下游依赖做资源隔离,都尽可能的缩小故障发生时的影响范围。

另外对非关键路径上的服务故障做了降级。例如账号的一个查询服务依赖Redis,当Redis抖动的时候服务的可用性也随之降低,我们通过公司内部另外一套缓存中间件Cellar来做Redis的备用存储,当检测到Redis已经非常不可用时就切到Cellar上。通过开源组件Hystrix或者我们公司自研的中间件Rhino就能非常方便地解决这类问题,其原理是根据最近一个时间窗口内的失败率来预测下一个请求需不需要快速失败,从而自动降级,这些步骤都能在毫秒级完成,相比人工干预的情况提升几个数量级,因此系统的可用性也会大幅提高。下图是优化前后的对比图,可以非常明显的看到,系统的容错能力提升了,TP999也能控制在合理范围内。

性能对比图前

性能对比图后

对于关键路径上的服务故障我们可以减少其影响的用户数。比如手机快捷登录流程里的某个关键服务挂了,我们可以在返回的失败文案上做优化,并且在登录入口挂小黄条提示,让用户主动去其他登录途径,这样对于那些设置过密码或者绑定了第三方的用户还有其他选择。

具体的做法是我们在每个登录入口都关联了一个计数器,一旦其中的关键节点不可用,就会在受影响的计数器上加1,如果节点恢复,则会减1,每个计数器还分别对应一个标志位,当计数器大于0时,标志位为1,否则标志位为0。我们可以根据当前标志位的值得知登录入口的可用情况,从而在登录页展示不同的提示文案,这些提示文案一共有2^5=32种。

登录降级流程图

下图是我们在做故障模拟时的降级提示文案:

3. 异地多活

除了柔性可用,还有一种思路可以来延长不出故障的时间,那就是做冗余,冗余的越多,系统的故障率就越低,并且是呈指数级降低。不管是机房故障,还是存储故障,甚至是网络故障,都能依赖冗余去解决,比如数据库可以通过增加从库的方式做冗余,服务层可以通过分布式架构做冗余,但是冗余也会带来新的问题,比如成本翻倍,复杂性增加,这就要衡量投入产出比。

目前美团的数据中心机房主要在北京上海,各个业务都直接或间接的依赖账号服务,尽管公司内已有北上专线,但因为专线故障或抖动引发的账号服务不可用,间接导致的业务损失也不容忽视,我们就开始考虑做跨城的异地冗余,即异地多活。

3.1 方案设计

首先我们调研了业界比较成熟的做法,主流思路是分set化,优点是非常利于扩展,缺点是只能按一个维度划分。比如按用户ID取模划分set,其他的像手机号和邮箱的维度就要做出妥协,尤其是这些维度还有唯一性要求,这就使得数据同步或者修改都增加了复杂度,而且极易出错,给后续维护带来困难。考虑到账号读多写少的特性(读写比是350:1),我们采用了一主多从的数据库部署方案,优先解决读多活的问题。

Redis如果也用一主多从的模式可行吗?答案是不行,因为Redis主从同步机制会优先尝试增量同步,当增量同步不成功时,再去尝试全量同步,一旦专线发生抖动就会把主库拖垮,并进一步阻塞专线,形成“雪崩效应”。因此两地的Redis只能是双主模式,但是这种架构有一个问题,就是我们得自己去解决数据同步的问题,除了保证数据不丢,还要保证数据一致。

另外从用户进来的每一层路由都要是就近的,因此DNS需要开启智能解析,SLB要开启同城策略,RPC已默认就近访问。

总体上账号的异地多活遵循以下三个原则:
1.北上任何一地故障,另一地都可提供完整服务。
2.北上两地同时对外提供服务,确保服务随时可用。
3.两地服务都遵循BASE原则,确保数据最终一致。

最终设计方案如下:

异地多活架构图

3.2 数据同步

首先要保证数据在传输的过程中不能丢,因此需要一个可靠接收数据的地方,于是我们采用了公司内部的MQ平台Mafka(类Kafka)做数据中转站。可是消息在经过Mafka传递之后可能是乱序的,这导致对同一个key的一串操作序列可能导致不一致的结果,这是不可忍受的。但Mafka只是不保证全局有序,在单个partition内却是有序的,于是我们只要对每个key做一遍一致性散列算法对应一个partitionId,这样就能保证每个key的操作是有序的。

但仅仅有序还不够,两地的并发写仍然会造成数据的不一致。这里涉及到分布式数据的一致性问题,业界有两种普遍的认知,一种是Paxos协议,一种是Raft协议,我们吸取了对实现更为友好的Raft协议,它主张有一个主节点,其余是从节点,并且在主节点不可用时,从节点可晋升为主节点。简单来说就是把这些节点排个序,当写入有冲突时,以排在最前面的那个节点为准,其余节点都去follow那个主节点的值。在技术实现上,我们设计出一个版本号(见下图),实际上是一个long型整数,其中数据源大小即表示节点的顺序,把版本号存入value里面,当两个写入发生冲突的时候只要比较这个版本号的大小即可,版本号大的覆盖小的,这样能保证写冲突时的数据一致性。

异地多活之版本号

写并发时数据同步过程如下图:

写并发时数据同步过程图

这种同步方式的好处显而易见,可以适用于所有的Redis操作且能保证数据的最终一致性。但这也有一些弊端,由于多存了版本号导致Redis存储会增加,另外在该机制下两地的数据其实是全量同步的,这对于那些仅用做缓存的存储来说是非常浪费资源的,因为缓存有数据库可以回源。而账号服务几乎一半的Redis存储都是缓存,因此我们需要对缓存同步做优化。

账号服务的缓存加载与更新模式如下图:

缓存加载与更新模式

我们优化的方向是在缓存加载时不同步,只有在数据库有更新时才去同步。但是数据更新这个流程里不能再使用delete操作,这样做有可能使缓存出现脏数据,比如下面这个例子:

我们对这个问题的解决办法是用set(若key不存在则添加,否则覆盖)代替delete,而缓存的加载用add(若key不存在则添加,否则不修改),这样能保证缓存更新时的强一致性却不需要增加额外存储。考虑到账号修改的入口比较多,我们希望缓存更新的逻辑能拎出来单独处理减少耦合,最后发现公司内部数据同步组件Databus非常适用于该场景,其主要功能是把数据库的变更日志以消息的形式发出来。于是优化后的缓存模式如下图:

从理论变为工程实现的时候还有些需要注意的地方,比如同步消息没发出去、数据收到后写失败了。因此我们还需要一个方法来检测数据不一致的数量,为了做到这点,我们新建了一个定时任务去scan两地的数据做对比统计,如果发现有不一致的还能及时修复掉。

项目上线后,我们也取得一些成果,首先性能提升非常明显,异地的调用平均耗时和TP99、TP999均至少下降80%,并且在一次线上专线故障期间,账号读服务对外的可用性并没有受影响,避免了更大范围的损失。

总结

服务的高可用需要持续性的投入与维护,比如我们会每月做一次容灾演练。高可用也不止体现在某一两个重点项目上,更多的体现在每个业务开发同学的日常工作里。任何一个小Bug都可能引起一次大的故障,让你前期所有的努力都付之东流,因此我们的每一行代码,每一个方案,每一次线上改动都应该是仔细推敲过的。高可用应该成为一种思维方式。最后希望我们能在服务高可用的道路上越走越远。

团队简介

账号团队拥有一群朝气蓬勃的成员:堂堂、德鑫、杨正、可可、徐升、艳豪,虽然他们之中有些刚毕业不久,但技术上都锐意进取,在讨论技术方案时观点鲜明,大家都充分地享受着思想碰撞的火花,这个年轻的团队在一起推进着高可用项目的进行,共同撑起了账号服务的平稳运行及业务发展。

招聘信息

如果你觉得我们的高可用仍有提升空间,欢迎来大众点评基础平台研发组;

如果你想更深入学习高可用的技术细节,欢迎来大众点评基础平台研发组;

如果你想遇到一群志同道合的技术开发,欢迎来大众点评基础平台研发组。

简历传送门:tangtang.sha#dianping.com。

原文出自:https://tech.meituan.com/archives

实时数据产品实践——美团大交通战场沙盘

美团点评阅读(1845)

背景

大数据时代,数据的重要性不言而喻,尤其对于互联网公司,随着业务的快速变化,商业模式的不断创新、用户体验个性化、实时化需求日益突出,海量数据实时处理在商业方面的需求越来越大。如何通过数据快速分析出用户的行为,以便做出准确的决策,越来越体现一个公司的价值。现阶段对于实时数据的建设比较单一,主要存在以下问题:

  1. 实时仓库建设不足,维度及指标不够丰富,无法快速满足不同业务需求。
  2. 实时数据和离线数据对比不灵活,无法自动化新增对比基期数据,且对比数据无法预先生产。
  3. 数据监控不及时,一旦数据出现问题而无法及时监控到,就会影响业务分析决策。
    因此,本文将基于美团点评大交通实时数据产品,从面临的挑战、总体解决方案、数据设计架构、后台设计架构等几个方面,详细介绍实时数据系统的整体建设思路。

挑战

实时流数据来源系统较多,处理非常复杂,并且不同业务场景对实时数据的要求不同,因此在建设过程主要有以下挑战:

  1. 如何在保证数据准确性的前提下实现多实时流关联;实时流出现延迟、乱序、重复时如何解决。
    流式计算中通常需要将多个实时流按某些主键进行关联得到特定的实时数据,但不同于离线数据表关联,实时流的到达是一个增量的过程,无法获取实时流的全量数据,并且实时流的达到次序无法确定,因此在进行关联时需要考虑存储一些中间状态及下发策略问题。
  2. 实时流可复用性,实时流的处理不能只为解决一个问题,而是一类甚至几类问题,需要从业务角度对数据进行抽象,分层建设,以快速满足不同场景下对数据的要求。
  3. 中台服务如何保证查询性能、数据预警及数据安全。
    实时数据指标维度较为丰富,多维度聚合查询场景对服务层的性能要求较高,需要服务层能够支持较快的计算能力和响应能力;同时数据出现问题后,需要做好及时监控并快速修复。
  4. 如何保证产品应用需求个性化。
    实时数据与离线数据对比不灵活,需要提供可配置方案,并能够及时生产离线数据。

解决思路

我们在充分梳理业务需求的基础上,重新对实时流进行了建设,将实时数据分层建模,并对外提供统一的接口,保证数据同源同口径;同时,在数据服务层,增加可配置信息模块解决了配置信息不能自动化的问题,在数据处理策略上做了多线程处理、预计算、数据降级等优化,在数据安全方面增加数据审计功能,更好地提升了产品的用户体验。

总体方案

产品整体建设方案基于美团点评技术平台,总共分为源数据层、存储层、服务层及WEB层,整体架构如下所示:


图1 整体架构图

源数据层:主要提供三部分数据,实时数据、离线数据、配置信息、维度信息。
存储层:源数据清洗后放入相应的存储引擎中,为服务层提供数据服务。
服务层:提供三部分功能,数据API服务、预计算服务、权限服务、数据审计服务。
Web层:使用Echarts可视化数据。

数据层

数据架构

依托于美团点评提供的公共资源平台,数据架构按功能分为数据采集、数据处理、数据存储、数据服务四层,如下所示:


图2 数据架构图

数据采集

数据来源主要有两种:业务上报的Log日志及数据库Binlog日志。这些日志通过美团点评日志中心进行采集后存储在消息中间件Kafka中,并按照不同的分类存储在不同的Topic中,供下游订阅。

数据处理

数据处理顾名思义,就是对采集的实时流进行逻辑处理,按业务需求输出对应的实时数据,因此这一步骤是流式计算的关键,分两步进行:数据加工、数据推送。

数据加工:数据加工通常需要在流式计算系统中进行,目前流行的流式处理系统主要有Storm、Spark Streaming系统及Flink系统,这些系统都能在不同的应用场景下发挥很好处理能力,并各有优缺点,如下图所示:

计算框架吞吐量延迟传输保障处理模式成熟度
Storm毫秒级At least once单条处理成熟
Spark Streaming秒级Exactly once微批处理成熟
Flink毫秒级Exactly once单条处理/微批处理新兴

最终我们选择Storm作为实时数据处理框架,并借助公司提供的通用组件来简化拓扑开发流程和重复代码编码。例如,组件MTSimpleLogBolt的主要功能是将Kafka中读取的数据(Log or Binlog)解析成Java实体对象;组件StormConfHelper的功能是获取Storm作业应用配置信息。

数据推送:将处理好的数据推送到存储引擎中。

数据存储

数据加工完成后会被存储到实时存储引擎中,以提供给下游使用。目前常用的存储引擎主要有MySQL、Druid、Elasticsearch、Redis、Tair比较如下:

存储引擎优点缺点
MySQL使用简单,支持数据量小数据量大,对MySQL的压力大,查询性能慢
Druid数据预计算不支持精确查询
Elasticsearch查询效率快,支持常用聚合操作;可以做到精确去重查询条件受限
Redis内存存储KV,查询效率高写入资源有限,不支持大数据量写入
Tair持久化和非持久化两种缓存,查询效率高单节点性能比Redis较弱
Kylin多维查询预计算不支持实时

综上比较,由于实时数据量较大,且数据精度要求较高,因此我们最终选择交易存储使用ES,流量存储使用Druid,维度存储使用Tair,中间数据存储使用Redis;而离线数据,我们采用Hive和Kylin存储。

数据服务

将存储引擎数据统一对外提供查询服务,支持不同业务应用场景。

具体实现

实时流处理流程

整个数据层架构上主要分为实时数据和离线数据两部分:实时数据分为交易的Binlog日志和流量的Log日志,经过Strom框架处理后写入Kafka,再经过DataLinkStreaming分别写入ES和Druid;离线数据通过Hive处理写入Kylin。


图3 产品数据架构

下图所示为一条消息的处理流程:


图4 数据关系

两个Topic分别是order_base(主要存放订单基本信息:订单id、订单状态、支付时间、票量、金额等);order_biz(主要存放订单的扩展信息:订单id、订单类型、出发时间、到达时间、出发城市、到达城市)。我们最终要拿到一条包括上述全部内容的一条记录。


图5 数据处理流程

具体例子:Bolt在处理一条记录时,首先判断这条记录是base还是biz,如果是base则写入缓存中base的Category中,如果是biz则写入biz的Category中。以order_id为Key,如果是base则去和biz关联,如果biz存在则代表能够关联上,这时发送关联后的完整数据,同时删除该主键(order_key)记录;如果biz中不存在,则说明没关联上,这时可能biz的数据延迟或者是丢失,为了保证主数据的准确性,这时我们只发送base的数据,缓存中的数据保留不被删除。如果这条消息是biz,则首先会更新缓存中该主键的biz记录,然后去和base关联,关联上则发送同时删除base中数据,否则不发送。此时我们会根据ES的Update特性去更新之前的数据。从现实效果来看保证了99.2%的数据完整性,符合预期。

数据写入

在Topic2es的数据推送中,通过DataLinkString工具(底层Spark Streaming)实现了Kafka2es的微批次同步,一方面通过多并发batch写入ES获得了良好的吞吐,另一方面提供了5秒的实时写入效率,保证了ES查询的实时可见。同时我们也维护了Kafka的Offset,可以提供At lease once的同步服务,并结合ES的主键,可以做到Exactly once,有效解决了数据重复问题。

ES索引设计及优化

在数据写入ES过程中,由于数据量大,索引时间区间长,在建设索引时需要考虑合理设计保证查询效率,因此主要有以下三点优化:

  • 写入优化 在通过DataLinkString写入ES时,在集群可接受的范围内,数据Shuffle后再分组,增加Client并发数,提升写入效率。
  • 数据结构化 根据需要设计了索引的模版,使用了最小的足够用的数据类型。
  • 按天建索引 通过模版按天建索引,避免影响磁盘IO效率,同时通过别名兼容搜索一致性。
  • 设置合理的分片和副本数 如果分片数过少或过多都会导致检索比较慢。分片数过多会导致检索时打开比较多的文件,另外也会影响多台服务器之间通讯。而分片数过少为导至单个分片索引过大,所以检索速度慢。在确定分片数之前需要进行单服务单索引单分片的测试。 我们根据 索引分片数=数据总量/单分片数 设置了合理的分片数。

实时数据仓库模型

整个实时数据开发遵循大交通实时数仓的分层设计,在此也做一下简单介绍,实时数仓架构如下:

图6 实时数仓架构

ODS层:包含美团页面流量日志、模块事件日志以及用户操作的Binlog信息日志,是直接从业务系统采集过来的原始数据。
事实明细层:根据主题和业务过程,生成订单事实和流量事实。
汇总层:对明细层的数据扩展业务常用的维度信息,形成主题宽表。
App层:针对不同应用在汇总层基础上加工扩展的聚合数据,如火车票在抢票业务下的交易数据汇总信息。

规范建模后,业务需求来临时,只需要在App层建模即可,底层数据统一维护。

中台服务层

后台服务主要实现 登陆验证和权限验证(UPM)、指标逻辑计算和API、预计算服务、数据质量监控、数据审计功能。由于数据量大且实时性要求较高,在实现过程遇到如下挑战:

  • 如何保证查询响应性能。
  • 服务发生故障后,数据降级方案。
  • 数据监控预警方案及数据出现问题解决方案。

针对以上问题,下面进行一一详述:

性能响应优化

服务层处理数据过程中,由于数据量大,在查询时需要一定的响应时间,所以在保证响应性能方面,主要做了以下优化:


图7 性能响应优化

  1. 项目初始由于数据量不是很大,采用单线程直接查询ES,能够满足需求。
  2. 随着节假日来临,数据量大增,并行查询人次增多,查询响应变慢,无法快速响应结果,因此引入缓存技术,将中间结果进行缓存,并在缓存有效期内,直接读取缓存数据大大提高了时间效率;并且在此基础上,引入Master-Worker多线程模式,将多指标查询拆分,并行查询ES,使得查询响应大大提高。
  3. 虽然问题得到解决,但仍存在一个问题,就是每次都是现查ES及部分中间缓存结果,尤其是第一次查询,需要完全走ES,这样就会让第一个查询数据的用户体验较差,因此引入预计算服务,通过定时调度任务,将部分重要维度下的指标进行预计算放入缓存,用户查询时直接读取缓存数据。而一些不常用的维度下的数据,采用的策略是,第一个用户查询时现查ES,并将结果数据预加载到缓存,后续所有用户再次查询直接读缓存数据,这样既能保证用户体验,也不至于占用太多缓存空间。

数据降级方案

使用缓存避免不了出现一些问题,比如缓存失效、缓存雪崩等问题,针对缓存雪崩问题,通过设置不同Key的过期时间能够很好的解决;而对于缓存数据失效,我们有自己的数据降级方案,具体方案如下:


图8 数据降级方案

预计算数据会分别在Redis、Tair和本地缓存中存储一份以保证查询效率,当查询Redis数据不存在时,会去Tair中读取数据,Tair也为空时,会读取本地缓存,只有当本地缓存数据也为空时,才会现查ES做聚合计算,这样也会降低ES的查询压力。

数据监控

实时监控预警非常重要,在数据出现问题时,一方面能够及时通知我们快速定位修复数据,另一方面也能够及时周知业务同学,避免做出错误分析。基于此,我们做了两方面的实时监控,其一是对源实时流在Storm处理层面的监控,确保源实时流正确生产;其二是对展示的汇总数据进行监控,确保产品展示指标数据正常。
针对数据出现问题预警,我们在解决方案上规范了流程:

  1. 监控报警机制及时周知相关同学。
  2. 第一时间通过产品上方的黄条提示用户哪些数据异常。
  3. 快速定位问题,给出修复方案。
    目前对于实时异常数据的修补,主要有两种方法:
    a. 针对特殊情况的数据修补方案第一灵活指定Offset,重新消费Kafka数据。
    b. 预留了Hive2es的准实时重导功能,确保生产数据的准确和完整。

数据安全

在以数据取胜的时代,数据的安全不言而喻,我们采用公司提供的UPM权限接口进行二级权限管理并加入审计功能及水印功能,能够准确记录用户的所有访问以及操作记录,并且将日志数据格式化到数据库中,进行实时监控分析。

总结

实时数据可以为业务特定场景分析决策提供巨大支持,尤其对于大交通节假日及春运期间。在大交通实时战场沙盘产品化过程中,我们投入了大量的思考和实践,主要取得以下收益:

  1. 可视化的产品,为业务方实时分析提供极大便利,取得较好的反馈。
  2. 优化实时数据仓库建设,合理分层建模,规范命名设计,统一维度建设和指标口径,对外提供统一接口,保证数据规范准确。
  3. 在Storm框架下实时开发和数据写入方面积累了一定的经验。
  4. 服务层支持可配置信息,可以灵活配置个性化信息。
  5. 服务层性能及获取数据策略的优化,为用户带来更好的产品体验。

加入我们

最后插播一个招聘广告,我们是一群擅长大数据领域数据建设、数仓建设、数据治理及数据BI应用建设的工程师,期待更多能手加入,感兴趣的可以投递个人简历到邮箱:yangguang09#meituan.com,欢迎您的加入。

作者介绍

娣娣,美团点评数据开发工程师,2015年加入美团,从事数据仓库建设、大数据产品开发工作。
晓磊,美团点评数据开发工程师,2017年加入美团,从事数据仓库建设、大数据产品开发工作。

原文出自:https://tech.meituan.com/archives

分布式块存储系统Ursa的设计与实现

美团点评阅读(1945)

引言

云硬盘对IaaS云计算平台有至关重要的作用,几乎已成为必备组件,如亚马逊的EBS(Elastic Block Store)、阿里云的盘古、OpenStack中的Cinder等。云硬盘可为云计算平台带来许多优良特性,如更高的数据可靠性和可用性、灵活的数据快照功能、更好的虚拟机动态迁移支持、更短的主机故障恢复时间等等。随着万兆以太网逐渐普及,云硬盘的各项优势得到加强和凸显,其必要性变得十分强烈。

云硬盘的底层通常是分布式块存储系统,目前开源领域有一些此类项目,如Ceph RBD、Sheepdog。另外MooseFS和GlusterFS虽然叫做文件系统,但由于其特性与块存储系统接近,也能用于支持云硬盘。我们在测评中发现,这些开源项目均存在一些问题,使得它们都难以直接应用在大规模的生产系统当中。例如Ceph RBD的效率较低(CPU使用过高);Sheepdog在压力测试中出现了数据丢失的现象;MooseFS的POSIX语义支持、基于FUSE的架构、不完全开源的2.0版本等问题给它自身带来了许多的局限性;GlusterFS与Ceph同属红帽收购的开源存储系统,主要用于scale-out文件存储场景,在云计算领域使用不多。此外,这些存储系统都难以充发挥用万兆网卡和SSD的性能潜力,难以在未来承担重任。

由于以上原因,美团云研发了全新的分布式块存储系统Ursa,通过简单稳固的系统架构、高效的代码实现以及对各种非典型场景的仔细考虑,实现了高可靠、高可用、高性能、低开销、可扩展、易运维、易维护等等目标。Ursa的名字源于DotA中的熊战士,他具有极高的攻击速度、攻击力和生命值,分别隐喻存储系统中的IOPS、吞吐率和稳定性。

分布式块存储相关项目与技术

2.1 Ceph

(主要参考:https://www.ustack.com/blog/ceph_infra/

Ceph项目起源于其创始人Sage Weil在加州大学Santa Cruz分校攻读博士期间的研究课题。项目的起始时间为2004年。在2006年的OSDI学术会议上,Sage发表了关于Ceph的论文,并提供了项目的下载链接,由此开始广为人知。2010年Ceph客户端部分代码正式进入Linux kernel 2.6.34。

Ceph同时提供对象、块和文件这三个层次的分布式存储服务,其中只有块层存储与我们相关。由于块存储在IaaS云计算系统中占有重要地位,Ceph在近些年的关注度得到显著提高。许多云计算系统实例基于Ceph提供块存储服务,如UnitedStack、Mirantis OpenStack等。

Ceph性能测试

  • 测试版本:0.81
  • 操作系统:CentOS 6.x
  • 测试工具:fio
  • 服务器配置:
    • CPU: Intel Xeon E5-2650v2 @ 2.6GHz
    • RAM: 96GB
    • NIC: 10 GbE
    • HDD: 6 NL SAS, 7200 RPM
    • RAID Controller: Dell H710p (LSI 2208 with 1GB NVRAM)
  • 服务器数量:4,其中一个为兼职客户端

注意:由于客户端位于一个存储服务器上,所以有1/4的吞吐率不经过网卡。

测试结果如下:

  • 读IOPS:16 407(此时客户端CPU占用率超过500%,5台服务器CPU总使用率接近500%)
  • 写IOPS:941
  • 顺序读吞吐率:218 859 KB/s
  • 顺序写吞吐率:67 242 KB/s
  • 顺序读延迟:1.6ms (664 IOPS)
  • 顺序写延迟:4.4ms (225 IOPS)
  • 网络ping值:0.1324ms
  • 本地硬盘顺序读写延迟:0.03332ms (29 126 IOPS)

从测试来看,Ceph的读吞吐率正常,然而写吞吐率不足读的1/3,性能偏低;读写延迟比显著大于网络延迟与磁盘I/O延迟之和;CPU占用率过高。

2.2 Sheepdog

(主要参考:http://peterylh.blog.163.com/blog/static/12033201221594937257/

Sheepdog是由日本NTT实验室的Morita Kazutaka专为虚拟化平台创立的分布式块存储开源项目, 于2009年开源[1]。从2011年9月开始, 一些淘宝的工程师加入了Sheepdog项目,以及相关开源项目比如Corosync、Accord的开发。Sheepdog主要由两部分组成:集群管理和存储服务,其中集群管理目前使用Corosync或者Zookper来完成,存储服务是全新实现的。

Sheepdog采用无中心节点的全对称架构,基于一致性哈希实现从ObjectID到存储节点的定位:每个节点划分成多个虚拟节点,虚拟节点和ObjectID一样,采用64位整数唯一标识,每个虚拟节点负责一段包含节点ID在内的ObjectID区间。DataObject副本存在ObjectID对应的虚拟节点,及在后续的几个节点上。Sheepdog无单点故障问题,存储容量和性能均可线性扩展,新增节点通过简单配置即可加入集群,并且Sheepdog自动实现负载均衡,节点故障时可自动发现并进行副本修复,还直接支持QEMU/KVM。

Sheepdog的服务进程既承担数据服务的职责,同时也是客户端(QEMU)访问数据的gateway。QEMU的Sheepdog driver将对volume的请求转换成为对object的请求,然后通过UNIX domain socket或者TCP socket连接一个Sheepdog服务进程,并将访问请求发给该进程来完成后续步骤。Sheepdog的服务进程还可以开启数据缓存功能,以减少网络I/O。Sheepdog的I/O路径是“client<->gateway<->object manager(s)”,读操作可以在任意副本完成,更新操作并行的发往所有副本, 当所有副本都更新成功之后,gateway才告诉客户端更新操作成功。

Sheepdog的数据可靠性问题

我们对Sheepdog开展了可靠性、可用性测试。在测试中有共3台服务器,每台配有6个机械硬盘,配置好Sheepdog之后,每台服务器启动10个VM,每个VM内无限循环运行fio分别执行小块随机读、写和大块顺序读、写的测试。

在执行压力测试一周后,对集群中的全部数据进行一致性检测(collie cluster check),发现有些数据块副本与另外2个不一致(“fixed replica …”),有些数据块的3个各不相同(“no majority of …”):

[root@node3-10gtest ~]# collie cluster check
fix vdi test1-3
99.9 % [=================================================================>] 50 GB / 50 GB 
fixed replica 3e563000000fca
99.9 % [=================================================================>] 50 GB / 50 GB      
fixed replica 3e563000000fec
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e5630000026f5
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000002da6
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001e8c
100.0 % [================================================================>] 50 GB / 50 GB      
fixed replica 3e563000001530
...
fix vdi test2-9
50.9 % [=================================>                                ] 25 GB / 50 GB      
no majority of d781e300000123
51.0 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e300000159
51.2 % [===================================>                              ] 26 GB / 50 GB      
no majority of d781e30000018a
53.2 % [====================================>                             ] 27 GB / 50 GB      
…

2.3 MooseFS

(主要参考:http://peterylh.blog.163.com/blog/static/12033201251791139592/

MooseFS是容错的分布式文件系统,通过FUSE支持标准POSIX文件系统接口。 MooseFS的架构类似于GFS,由四个部分组成:

  • 管理服务器Master:类似于GFS的Master,主要有两个功能:(1)存储文件和目录元数据,文件元数据包括文件大小、属性、对应的Chunk等;(2)管理集群成员关系和Chunk元数据信息,包括Chunk存储、版本、Lease等。
  • 元数据备份服务器Metalogger Server:根据元数据文件和log实时备份Master元数据。
  • 存储服务器Chunk Server:负责存储Chunk,提供Chunk读写能力。Chunk文件默认为64MB大小。
  • 客户端Client:以FUSE方式挂到本地文件系统,实现标准文件系统接口。

MooseFS本地不会缓存Chunk信息, 每次读写操作都会访问Master, Master的压力较大。此外MooseFS写操作流程较长且开销较高。MooseFS支持快照,但是以整个Chunk为单位进行CoW(Copy-on-Write),可能造成响应时间恶化,补救办法是以牺牲系统规模为代价,降低Chunk大小。

MooseFS基于FUSE提供POSIX语义支持,已有应用可以不经修改直接迁移到MooseFS之上,这给应用带来极大的便利。然而FUSE也带来了一些负面作用,比如POSIX语义对于块存储来说并不需要,FUSE会带来额外开销等等。

2.4 GFS/HDFS

(主要参考:http://www.nosqlnotes.net/archives/119

HDFS基本可以认为是GFS的一个简化开源实现,二者因此有很多相似之处。首先,GFS和HDFS都采用单一主控机+多台工作机的模式,由一台主控机(Master)存储系统全部元数据,并实现数据的分布、复制、备份决策,主控机还实现了元数据的checkpoint和操作日志记录及回放功能。工作机存储数据,并根据主控机的指令进行数据存储、数据迁移和数据计算等。其次,GFS和HDFS都通过数据分块和复制(多副本,一般是3)来提供更高的可靠性和更高的性能。当其中一个副本不可用时,系统都提供副本自动复制功能。同时,针对数据读多于写的特点,读服务被分配到多个副本所在机器,提供了系统的整体性能。最后,GFS和HDFS都提供了一个树结构的文件系统,实现了类似与Linux下的文件复制、改名、移动、创建、删除操作以及简单的权限管理等。

然而,GFS和HDFS在关键点的设计上差异很大,HDFS为了规避GFS的复杂度进行了很多简化。例如HDFS不支持并发追加和集群快照,早期HDFS的NameNode(即Master)没原生HA功能。总之,HDFS基本可以认为是GFS的简化版,由于时间及应用场景等各方面的原因对GFS的功能做了一定的简化,大大降低了复杂度。

2.5 HLFS

(主要参考:http://peterylh.blog.163.com/blog/static/120332012226104116710/

HLFS(HDFS Log-structured File System)是一个开源分布式块存储系统,其最大特色是结合了LFS和HDFS。HDFS提供了可靠、随时可扩展的文件服务,而HLFS通过Log-structured技术弥补了HDFS不能随机更新的缺憾。在HLFS中,虚拟磁盘对应一个文件, 文件长度能够超过TB级别,客户端支持Linux和Xen,其中Linux基于NBD实现,Xen基于blktap2实现,客户端通过类POSIX接口libHLFS与服务端通讯。HLFS主要特性包括多副本、动态扩容、故障透明处理和快照。

HLFS性能较低。首先,非原地更新必然导致数据块在物理上非连续存放,因此读I/O比较随机,顺序读性能下降。其次,虽然从单个文件角度看来,写I/O是顺序的,但是在HDFS的Chunk Server服务了多个HLFS文件,因此从它的角度来看,I/O仍然是随机的。第三,写延迟问题,HDFS面向大文件设计,小文件写延时不够优化。第四,垃圾回收的影响,垃圾回收需要读取和写入大量数据,对正常写操作造成较大影响。此外,按照目前实现,相同段上的垃圾回收和读写请求不能并发,垃圾回收算法对正常操作的干扰较大。

2.6 iSCSI、FCoE、AoE、NBD

iSCSI、FCoE、AoE、NBD等都是用来支持通过网络访问块设备的协议,它们都采用C/S架构,无法直接支持分布式块存储系统。

Ursa的设计与实现

分布式块存储系统给虚拟机提供的是虚拟硬盘服务,因而有如下设计目标:

  • 大文件存储:虚拟硬盘实际通常GB级别以上,小于1GB是罕见情况
  • 需要支持随机读写访问,不需支持追加写,需要支持resize
  • 通常情况下,文件由一个进程独占读写访问;数据块可被共享只读访问
  • 高可靠,高可用:任意两个服务器同时出现故障不影响数据的可靠性和可用性
  • 能发挥出新型硬件的性能优势,如万兆网络、SSD
  • 由于应用需求未知,同时需要优化吞吐率和IOPS
  • 高效率:降低资源消耗,就降低了成本

除了上述源于虚拟硬盘的直接需求意外,分布式块存储还需要支持下列功能:

  • 快照:可给一个文件在不同时刻建立多个独立的快照
  • 克隆:可将一个文件或快照在逻辑上复制成独立的多份
  • 精简配置(thin-provisioning):只有存储数据的部分才真正占用空间

3.1 系统架构

分布式存储系统总体架构可以分为有master(元数据服务器)和无master两大类。有master架构在技术上较为简单清晰,但存在单点失效以及潜在的性能瓶颈问题;无master架构可以消除单点失效和性能瓶颈问题,然而在技术上却较为复杂,并且在数据布局方面具有较多局限性。块存储系统对master的压力不大,同时master的单点故障问题可采用一些现有成熟技术解决,因而美团EBS的总体架构使用有master的类型。这一架构与GFS、HDFS、MooseFS等系统的架构属于同一类型。

如图1所示,美团EBS系统包括M、S和C三个部分,分别代表Master、Chunk Server和Client。Master中记录的元数据包括3种:(1)关于volume的信息,如类型、大小、创建时间、包含哪些数据chunk等等;(2)关于chunk的信息,如大小、创建时间、所在位置等;(3)关于Chunk Server的信息,如IP地址、端口、数据存储量、I/O负载、空间剩余量等。这3种信息当中,关于volume的信息是持久化保存的,其余两种均为暂存信息,通过Chunk Server上报。

I/O Stack

在元数据中,关于volume的信息非常重要,必须持久化保存;关于chunk的信息和Chunk Server的信息是时变的,并且是由Chunk Server上报的,因而没必要持久化保存。根据这样的分析,我们将关于volume的信息保存在MySQL当中,其他元数据保存在Redis当中,余下的集群管理功能由Manager完成。Master == Manager + MySQL + Redis,其中MySQL使用双机主从配置,Redis使用官方提供的标准cluster功能。

3.2 CAP取舍

C、A、P分别代表Consistency、Availability和Partition-tolerance。分布式系统很难同时在这三个方面做到很高的保障,通常要在仔细分析应用需求的基础之上对CAP做出取舍。

块存储系统主要用于提供云硬盘服务,每块云硬盘通常只会挂载到1台VM之上,不存在多机器并发读写的情况,因而其典型应用场景对一致性的需求较低。针对这一特性,我们可以在设计上舍C而取AP。

对于多机并行访问云硬盘的使用模式,若数据是只读的则无需额外处理;若数据有写有读,甚至是多写多读,则需要在上层引入分布式锁,来确保数据一致性和完整性。这种使用模式在SAN领域并不少见,其典型应用场景是多机同时挂载一个网络硬盘,并通过集群文件系统(而不是常见的单机文件系统)来协调访问存储空间。集群文件系统内部会使用分布式锁来确保数据操作的正确性,所以我们舍C的设计决策不会影响多机并行访问的使用模式。

3.3 并发模型

并发(不是并行!)模型的选择和设计无法作为实现细节隐藏在局部,它会影响到程序代码的各个部分,从底层到上层。基本的并发模型只有这样几种:事件驱动、多线程、多进程以及较为小众的多协程。

事件驱动模型是一种更接近硬件工作模式的并发模型,具有更高的执行效率,是高性能网络服务的普遍选择。为能充分发挥万兆网络和SSD的性能潜力,我们必须利用多核心并行服务,因而需要选用多线程或者多进程模型。由于多进程模型更加简单,进程天然是故障传播的屏障,这两方面都十分有利于提高软件的健壮性;并且我们也很容易对业务进行横向拆分,做到互相没有依赖,也不需要交互,所以我们选择了多进程模型,与事件驱动一起构成混合模型。

协程在现实中的应用并不多,很多语言/开发生态甚至不支持协程,然而协程在一些特定领域其实具有更好的适用性。比如,QEMU/KVM在磁盘I/O方面的并发执行完全是由协程负责的,即便某些block driver只提供了事件驱动的接口(如Ceph RBD),QEMU/KVM也会自动把它们转化封装成多协程模式。实践表明,在并发I/O领域,多协程模型可以同时在性能和易用性方面取得非常好的效果,所以我们做了跟QEMU/KVM类似的选择——在底层将事件驱动模型转换成了多协程模型,最终形成了“多进程+多协程+事件驱动”的混合并发模型。

3.4 存储结构

如图所示,Ursa中的存储结构与GFS/HDFS相似,存储卷由64MB(可配置)大小的chunk组成。Ursa系统中的数据复制、故障检测与修复均在chunk层次进行。系统中,每3个(可配置)chunk组成一组,互为备份。每2个chunk组构成一个stripe组,实现条带化交错读写,提高单客户端顺序读写性能。Ursa在I/O栈上层添加Cache模块,可将最常用的数据缓存在客户端本地的SSD介质当中,当访问命中缓存时可大大提高性能。

I/O Stack

3.5 写入策略

最常见的写入策略有两种:(1)客户端直接写多个副本到各个Chunk Server,如图(a)所示,Sheepdog采用此种办法;(2)客户端写第一个副本,并将写请求依次传递下去,如图(b)所示。这两种方法各有利弊:前者通常具有较小的写入延迟,但吞吐率最高只能达到网络带宽的1/3;后者的吞吐率可以达到100%网络带宽,却具有较高的写入延迟。

I/O Stack

由于Ursa可能用于支持各种类型的应用,必须同时面向吞吐率和带宽进行优化,所以我们设计采用了一种分叉式的写入策略:如图(c)所示,客户端使用WRITE_REPLICATE请求求将数据写到第一个副本,称为primary,然后由primary负责将数据分别写到其他副本。这样Ursa可以在吞吐率和延迟两方面取得较好的均衡。为确保数据可靠性,写操作会等所有副本的写操作都完成之后才能返回。

3.6 无状态服务

Chunk Server内部几乎不保存状态,通常情况下各个请求之间是完全独立执行的,并且重启Chunk Server不会影响后续请求的执行。这样的Chunk Server不但具有更高的鲁棒性,而且具有更高的扩展性。许多其他网络服务、协议的设计都遵循了无状态的原则。

3.7 模块

如下图所示,Ursa中的I/O功能模块的组织采用decorator模式,即所有模块都实现了IStore抽象接口,其中负责直接与Chunk Server通信的模块属于decorator模式中的concrete component,其余模块均为concrete decorator。所有的decorator都保存数量不等的指向其他模块的指针(IStore指针)。

I/O Stack

在运行时,Ursa的I/O栈层次结构的对象图如下所示。

I/O Stack

3.8 产品界面

I/O Stack

性能实测

如下图所示,测试环境由万兆以太网、1台Client和3台Chunk Server组成,chunk文件存在tmpfs上,即所有写操作不落盘,写到内存为止。选择tmpfs主要是为了避免硬盘的I/O速度的局限性,以便测试Ursa在极限情况下的表现。

I/O Stack

测试环境的网络延迟如下:

I/O Stack

在上述环境中,用fio分别测试了读写操作的吞吐率、IOPS以及I/O延迟,测试参数与结果如下:

I/O Stack

从测试结果可以看出:
(1). Ursa在吞吐率测试中可以轻易接近网卡理论带宽;
(2). Ursa的IOPS性能可以达到SSD的水准;
(3). Ursa的读延迟接近ping值,写操作需要写3个副本,延迟比读高68%。

作为对比,我们在测试Ceph的过程中监测到服务端CPU占用情况如下:

I/O Stack

此时机器上5个存储服务进程ceph-osd共占用123.7%的CPU资源,提供了4 101的读IOPS服务;而Ursa的服务进程只消耗了43%的CPU资源,提供了61 340读IOPS服务,运行效率比Ceph高43倍。在客户端方面,Ceph消耗了500%+的CPU资源,得到了16 407读IOPS;而Ursa只消耗了96%的CPU资源,得到了61 340读IOPS,运行效率比Ceph高21倍

总结与展望

Ursa从零开始动手开发到内部上线只经历了9个月的时间,虽然基本功能、性能都已达到预期,但仍有许多需要进一步开发的地方。一个重要的方向是对SSD的支持。虽然将HDD简单替换为SSD,不修改Ursa的任何代码、配置就可以运行,并取得性能上的显著改善,然而SSD在性能、成本、寿命等方面与HDD差异巨大,Ursa必须做出针对性的优化才能使SSD扬长避短。另一个重要方向是对大量VM同时启动的更好的支持。如果同时有上百台相同的VM从Ursa启动(即系统盘放在Ursa上),会在短时间内有大量读请求访问相同的数据集(启动文件),形成局部热点,此时相关的Chunk Server服务能力会出现不足,导致启动变慢。由于各个VM所需的启动数据基本相同,我们可以采用“一传十,十传百”的方式组织一个应用层组播树overlay网络来进行数据分发,这样可以从容应对热点数据访问问题。随着一步步的完善,相信Ursa将在云平台的运营当中起到越来越重要的作用。

原文出自:https://tech.meituan.com/archives

Spark在美团的实践

美团点评阅读(1832)

本文已发表在《程序员》杂志2016年4月期。

前言

美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持。大数据处理渗透到了美团各业务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率。
美团最初的数据处理以Hive SQL为主,底层计算引擎为MapReduce,部分相对复杂的业务会由工程师编写MapReduce程序实现。随着业务的发展,单纯的Hive SQL查询或者MapReduce程序已经越来越难以满足数据处理和分析的需求。
一方面,MapReduce计算模型对多轮迭代的DAG作业支持不给力,每轮迭代都需要将数据落盘,极大地影响了作业执行效率,另外只提供Map和Reduce这两种计算因子,使得用户在实现迭代式计算(比如:机器学习算法)时成本高且效率低。
另一方面,在数据仓库的按天生产中,由于某些原始日志是半结构化或者非结构化数据,因此,对其进行清洗和转换操作时,需要结合SQL查询以及复杂的过程式逻辑处理,这部分工作之前是由Hive SQL结合Python脚本来完成。这种方式存在效率问题,当数据量比较大的时候,流程的运行时间较长,这些ETL流程通常处于比较上游的位置,会直接影响到一系列下游的完成时间以及各种重要数据报表的生成。
基于以上原因,美团在2014年的时候引入了Spark。为了充分利用现有Hadoop集群的资源,我们采用了Spark on Yarn模式,所有的Spark app以及MapReduce作业会通过Yarn统一调度执行。Spark在美团数据平台架构中的位置如图所示:

 离线计算平台架构图

经过近两年的推广和发展,从最开始只有少数团队尝试用Spark解决数据处理、机器学习等问题,到现在已经覆盖了美团各大业务线的各种应用场景。从上游的ETL生产,到下游的SQL查询分析以及机器学习等,Spark正在逐步替代MapReduce作业,成为美团大数据处理的主流计算引擎。目前美团Hadoop集群用户每天提交的Spark作业数和MapReduce作业数比例为4:1,对于一些上游的Hive ETL流程,迁移到Spark之后,在相同的资源使用情况下,作业执行速度提升了十倍,极大地提升了业务方的生产效率。
下面我们将介绍Spark在美团的实践,包括我们基于Spark所做的平台化工作以及Spark在生产环境下的应用案例。其中包含Zeppelin结合的交互式开发平台,也有使用Spark任务完成的ETL数据转换工具,数据挖掘组基于Spark开发了特征平台和数据挖掘平台,另外还有基于Spark的交互式用户行为分析系统以及在SEM投放服务中的应用,以下是详细介绍。

Spark交互式开发平台

在推广如何使用Spark的过程中,我们总结了用户开发应用的主要需求:

  1. 数据调研:在正式开发程序之前,首先需要认识待处理的业务数据,包括:数据格式,类型(若以表结构存储则对应到字段类型)、存储方式、有无脏数据,甚至分析根据业务逻辑实现是否可能存在数据倾斜等等。这个需求十分基础且重要,只有对数据有充分的掌控,才能写出高效的Spark代码;
  2. 代码调试:业务的编码实现很难保证一蹴而就,可能需要不断地调试;如果每次少量的修改,测试代码都需要经过编译、打包、提交线上,会对用户的开发效率影响是非常大的;
  3. 联合开发:对于一整个业务的实现,一般会有多方的协作,这时候需要能有一个方便的代码和执行结果共享的途径,用于分享各自的想法和试验结论。

基于这些需求,我们调研了现有的开源系统,最终选择了Apache的孵化项目Zeppelin,将其作为基于Spark的交互式开发平台。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了数据分析和可视化等功能。

 Zeppelin实例

我们在原生的Zeppelin上增加了用户登陆认证、用户行为日志审计、权限管理以及执行Spark作业资源隔离,打造了一个美团的Spark的交互式开发平台,不同的用户可以在该平台上调研数据、调试程序、共享代码和结论。

集成在Zeppelin的Spark提供了三种解释器:Spark、Pyspark、SQL,分别适用于编写Scala、Python、SQL代码。对于上述的数据调研需求,无论是程序设计之初,还是编码实现过程中,当需要检索数据信息时,通过Zeppelin提供的SQL接口可以很便利的获取到分析结果;另外,Zeppelin中Scala和Python解释器自身的交互式特性满足了用户对Spark和Pyspark分步调试的需求,同时由于Zeppelin可以直接连接线上集群,因此可以满足用户对线上数据的读写处理请求;最后,Zeppelin使用Web Socket通信,用户只需要简单地发送要分享内容所在的http链接,所有接受者就可以同步感知代码修改,运行结果等,实现多个开发者协同工作。

Spark作业ETL模板

除了提供平台化的工具以外,我们也会从其他方面来提高用户的开发效率,比如将类似的需求进行封装,提供一个统一的ETL模板,让用户可以很方便的使用Spark实现业务需求。

美团目前的数据生产主体是通过ETL将原始的日志通过清洗、转换等步骤后加载到Hive表中。而很多线上业务需要将Hive表里面的数据以一定的规则组成键值对,导入到Tair中,用于上层应用快速访问。其中大部分的需求逻辑相同,即把Hive表中几个指定字段的值按一定的规则拼接成key值,另外几个字段的值以json字符串的形式作为value值,最后将得到的对写入Tair。

 hive2Tair流程示例

由于Hive表中的数据量一般较大,使用单机程序读取数据和写入Tair效率比较低,因此部分业务方决定使用Spark来实现这套逻辑。最初由业务方的工程师各自用Spark程序实现从Hive读数据,写入到Tair中(以下简称hive2Tair流程),这种情况下存在如下问题:
每个业务方都要自己实现一套逻辑类似的流程,产生大量重复的开发工作;
由于Spark是分布式的计算引擎,因此代码实现和参数设置不当很容易对Tair集群造成巨大压力,影响Tair的正常服务。
基于以上原因,我们开发了Spark版的hive2Tair流程,并将其封装成一个标准的ETL模板,其格式和内容如下所示:

 hive2Tair  ETL模版

source用于指定Hive表源数据,target指定目标Tair的库和表,这两个参数可以用于调度系统解析该ETL的上下游依赖关系,从而很方便地加入到现有的ETL生产体系中。

有了这个模板,用户只需要填写一些基本的信息(包括Hive表来源,组成key的字段列表,组成value的字段列表,目标Tair集群)即可生成一个hive2Tair的ETL流程。整个流程生成过程不需要任何Spark基础,也不需要做任何的代码开发,极大地降低了用户的使用门槛,避免了重复开发,提高了开发效率。该流程执行时会自动生成一个Spark作业,以相对保守的参数运行:默认开启动态资源分配,每个Executor核数为2,内存2GB,最大Executor数设置为100。如果对于性能有很高的要求,并且申请的Tair集群比较大,那么可以使用一些调优参数来提升写入的性能。目前我们仅对用户暴露了设置Executor数量以及每个Executor内存的接口,并且设置了一个相对安全的最大值规定,避免由于参数设置不合理给Hadoop集群以及Tair集群造成异常压力。

基于Spark的用户特征平台

在没有特征平台之前,各个数据挖掘人员按照各自项目的需求提取用户特征数据,主要是通过美团的ETL调度平台按月/天来完成数据的提取。

但从用户特征来看,其实会有很多的重复工作,不同的项目需要的用户特征其实有很多是一样的,为了减少冗余的提取工作,也为了节省计算资源,建立特征平台的需求随之诞生,特征平台只需要聚合各个开发人员已经提取的特征数据,并提供给其他人使用。特征平台主要使用Spark的批处理功能来完成数据的提取和聚合。
开发人员提取特征主要还是通过ETL来完成,有些数据使用Spark来处理,比如用户搜索关键词的统计。
开发人员提供的特征数据,需要按照平台提供的配置文件格式添加到特征库,比如在图团购的配置文件中,团购业务中有一个用户24小时时段支付的次数特征,输入就是一个生成好的特征表,开发人员通过测试验证无误之后,即完成了数据上线;另外对于有些特征,只需要从现有的表中提取部分特征数据,开发人员也只需要简单的配置即可完成。

 特征平台 数据流向图

在图中,我们可以看到特征聚合分两层,第一层是各个业务数据内部聚合,比如团购的数据配置文件中会有很多的团购特征、购买、浏览等分散在不同的表中,每个业务都会有独立的Spark任务来完成聚合,构成一个用户团购特征表;特征聚合是一个典型的join任务,对比MapReduce性能提升了10倍左右。第二层是把各个业务表数据再进行一次聚合,生成最终的用户特征数据表。
特征库中的特征是可视化的,我们在聚合特征时就会统计特征覆盖的人数,特征的最大最小数值等,然后同步到RDB,这样管理人员和开发者都能通过可视化来直观地了解特征。 另外,我们还提供特征监测和告警,使用最近7天的特征统计数据,对比各个特征昨天和今天的覆盖人数,是增多了还是减少了,比如性别为女这个特征的覆盖人数,如果发现今天的覆盖人数比昨天低了1%(比如昨天6亿用户,女性2亿,那么人数降低了1%*2亿=2百万)突然减少2万女性用户说明数据出现了极大的异常,何况网站的用户数每天都是增长的。这些异常都会通过邮件发送到平台和特征提取的相关人。

Spark数据挖掘平台

数据挖掘平台是完全依赖于用户特征库的,通过特征库提供用户特征,数据挖掘平台对特征进行转换并统一格式输出,就此开发人员可以快速完成模型的开发和迭代,之前需要两周开发一个模型,现在短则需要几个小时,多则几天就能完成。特征的转换包括特征名称的编码,也包括特征值的平滑和归一化,平台也提供特征离散化和特征选择的功能,这些都是使用Spark离线完成。

开发人员拿到训练样本之后,可以使用Spark mllib或者Python sklearn等完成模型训练,得到最优化模型之后,将模型保存为平台定义好的模型存储格式,并提供相关配置参数,通过平台即可完成模型上线,模型可以按天或者按周进行调度。当然如果模型需要重新训练或者其它调整,那么开发者还可以把模型下线。不只如此,平台还提供了一个模型准确率告警的功能,每次模型在预测完成之后,会计算用户提供的样本中预测的准确率,并比较开发者提供的准确率告警阈值,如果低于阈值则发邮件通知开发者,是否需要对模型重新训练。

在开发挖掘平台的模型预测功时能我们走了点弯路,平台的模型预测功能开始是兼容Spark接口的,也就是使用Spark保存和加载模型文件并预测,使用过的人知道Spark mllib的很多API都是私有的开发人员无法直接使用,所以我们这些接口进行封装然后再提供给开发者使用,但也只解决了Spark开发人员的问题,平台还需要兼容其他平台的模型输出和加载以及预测的功能,这让我们面临必需维护一个模型多个接口的问题,开发和维护成本都较高,最后还是放弃了兼容Spark接口的实现方式,我们自己定义了模型的保存格式,以及模型加载和模型预测的功能。

 数据挖掘平台结构图

以上内容介绍了美团基于Spark所做的平台化工作,这些平台和工具是面向全公司所有业务线服务的,旨在避免各团队做无意义的重复性工作,以及提高公司整体的数据生产效率。目前看来效果是比较好的,这些平台和工具在公司内部得到了广泛的认可和应用,当然也有不少的建议,推动我们持续地优化。
随着Spark的发展和推广,从上游的ETL到下游的日常数据统计分析、推荐和搜索系统,越来越多的业务线开始尝试使用Spark进行各种复杂的数据处理和分析工作。下面将以Spark在交互式用户行为分析系统以及SEM投放服务为例,介绍Spark在美团实际业务生产环境下的应用。

Spark在交互式用户行为分析系统中的实践

美团的交互式用户行为分析系统,用于提供对海量的流量数据进行交互式分析的功能,系统的主要用户为公司内部的PM和运营人员。普通的BI类报表系统,只能够提供对聚合后的指标进行查询,比如PV、UV等相关指标。但是PM以及运营人员除了查看一些聚合指标以外,还需要根据自己的需求去分析某一类用户的流量数据,进而了解各种用户群体在App上的行为轨迹。根据这些数据,PM可以优化产品设计,运营人员可以为自己的运营工作提供数据支持,用户核心的几个诉求包括:

  1. 自助查询,不同的PM或运营人员可能随时需要执行各种各样的分析功能,因此系统需要支持用户自助使用。
  2. 响应速度,大部分分析功能都必须在几分钟内完成。
  3. 可视化,可以通过可视化的方式查看分析结果。

要解决上面的几个问题,技术人员需要解决以下两个核心问题:

  1. 海量数据的处理,用户的流量数据全部存储在Hive中,数据量非常庞大,每天的数据量都在数十亿的规模。
  2. 快速计算结果,系统需要能够随时接收用户提交的分析任务,并在几分钟之内计算出他们想要的结果。

要解决上面两个问题,目前可供选择的技术主要有两种:MapReduce和Spark。在初期架构中选择了使用MapReduce这种较为成熟的技术,但是通过测试发现,基于MapReduce开发的复杂分析任务需要数小时才能完成,这会造成极差的用户体验,用户无法接受。

因此我们尝试使用Spark这种内存式的快速大数据计算引擎作为系统架构中的核心部分,主要使用了Spark Core以及Spark SQL两个组件,来实现各种复杂的业务逻辑。实践中发现,虽然Spark的性能非常优秀,但是在目前的发展阶段中,还是或多或少会有一些性能以及OOM方面的问题。因此在项目的开发过程中,对大量Spark作业进行了各种各样的性能调优,包括算子调优、参数调优、shuffle调优以及数据倾斜调优等,最终实现了所有Spark作业的执行时间都在数分钟左右。并且在实践中解决了一些shuffle以及数据倾斜导致的OOM问题,保证了系统的稳定性。

结合上述分析,最终的系统架构与工作流程如下所示:

  1. 用户在系统界面中选择某个分析功能对应的菜单,并进入对应的任务创建界面,然后选择筛选条件和任务参数,并提交任务。
  2. 由于系统需要满足不同类别的用户行为分析功能(目前系统中已经提供了十个以上分析功能),因此需要为每一种分析功能都开发一个Spark作业。
  3. 采用J2EE技术开发了Web服务作为后台系统,在接收到用户提交的任务之后,根据任务类型选择其对应的Spark作业,启动一条子线程来执行Spark-submit命令以提交Spark作业。
  4. Spark作业运行在Yarn集群上,并针对Hive中的海量数据进行计算,最终将计算结果写入数据库中。
  5. 用户通过系统界面查看任务分析结果,J2EE系统负责将数据库中的计算结果返回给界面进行展现。

 交互式用户行为分析系统架构

该系统上线后效果良好:90%的Spark作业运行时间都在5分钟以内,剩下10%的Spark作业运行时间在30分钟左右,该速度足以快速响应用户的分析需求。通过反馈来看,用户体验非常良好。目前每个月该系统都要执行数百个用户行为分析任务,有效并且快速地支持了PM和运营人员的各种分析需求。

Spark在SEM投放服务中的应用

流量技术组负责着美团站外广告的投放技术,目前在SEM、SEO、DSP等多种业务中大量使用了Spark平台,包括离线挖掘、模型训练、流数据处理等。美团SEM(搜索引擎营销)投放着上亿的关键词,一个关键词从被挖掘策略发现开始,就踏上了精彩的SEM之旅。它经过预估模型的筛选,投放到各大搜索引擎,可能因为市场竞争频繁调价,也可能因为效果不佳被迫下线。而这样的旅行,在美团每分钟都在发生。如此大规模的随机“迁徙”能够顺利进行,Spark功不可没。

 SEM服务架构图

Spark不止用于美团SEM的关键词挖掘、预估模型训练、投放效果统计等大家能想到的场景,还罕见地用于关键词的投放服务,这也是本段介绍的重点。一个快速稳定的投放系统是精准营销的基础。

美团早期的SEM投放服务采用的是单机版架构,随着关键词数量的极速增长,旧有服务存在的问题逐渐暴露。受限于各大搜索引擎API的配额(请求频次)、账户结构等规则,投放服务只负责处理API请求是远远不够的,还需要处理大量业务逻辑。单机程序在小数据量的情况下还能通过多进程勉强应对,但对于如此大规模的投放需求,就很难做到“兼顾全局”了。

新版SEM投放服务在15年Q2上线,内部开发代号为Medusa。在Spark平台上搭建的Medusa,全面发挥了Spark大数据处理的优势,提供了高性能高可用的分布式SEM投放服务,具有以下几个特性:

  1. 低门槛,Medusa整体架构的设计思路是提供数据库一样的服务。在接口层,让RD可以像操作本地数据库一样,通过SQL来“增删改查”线上关键词表,并且只需要关心自己的策略标签,不需要关注关键词的物理存储位置。Medusa利用Spark SQL作为服务的接口,提高了服务的易用性,也规范了数据存储,可同时对其他服务提供数据支持。基于Spark开发分布式投放系统,还可以让RD从系统层细节中解放出来,全部代码只有400行。
  2. 高性能、可伸缩,为了达到投放的“时间”、“空间”最优化,Medusa利用Spark预计算出每一个关键词在远程账户中的最佳存储位置,每一次API请求的最佳时间内容。在配额和账号容量有限的情况下,轻松掌控着亿级的在线关键词投放。通过控制Executor数量实现了投放性能的可扩展,并在实战中做到了全渠道4小时全量回滚。
  3. 高可用,有的同学或许会有疑问:API请求适合放到Spark中做吗?因为函数式编程要求函数是没有副作用的纯函数(输入是确定的,输出就是确定的)。这确实是一个问题,Medusa的思路是把请求API封装成独立的模块,让模块尽量做到“纯函数”的无副作用特性,并参考面向轨道编程的思路,将全部请求log重新返回给Spark继续处理,最终落到Hive,以此保证投放的成功率。为了更精准的控制配额消耗,Medusa没有引入单次请求重试机制,并制定了服务降级方案,以极低的数据丢失率,完整地记录了每一个关键词的旅行。

结论和展望

本文我们介绍了美团引入Spark的起源,基于Spark所做的一些平台化工作,以及Spark在美团具体应用场景下的实践。总体而言,Spark由于其灵活的编程接口、高效的内存计算,能够适用于大部分数据处理场景。在推广和使用Spark的过程中,我们踩过不少坑,也遇到过很多问题,但填坑和解决问题的过程,让我们对Spark有了更深入的理解,我们也期待着Spark在更多的应用场景中发挥重要的作用。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——基础篇

美团点评阅读(1862)

前言

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。

然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。

笔者根据之前的Spark作业开发经验以及实践积累,总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同学,主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。

本文作为Spark性能优化指南的基础篇,主要讲解开发调优以及资源调优。

开发调优

调优概述

Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

原则一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

一个简单的例子

// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。

// 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。
// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

一个简单的例子

// 错误的做法。

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正确的做法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

原则三:对多次使用的RDD进行持久化

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

对多次使用的RDD进行持久化的代码示例

// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

Spark的持久化级别

持久化级别含义解释
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

如何选择一种最合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

groupByKey实现wordcount原理

reduceByKey实现wordcount原理

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。

使用reduceByKey/aggregateByKey替代groupByKey

详情见“原则五:使用map-side预聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

使用foreachPartitions替代foreach

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

广播大变量的代码示例

// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原则八:使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

资源调优

调优概述

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

Spark作业基本运行原理

Spark基本运行原理

详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

资源参数调优

了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

写在最后的话

根据实践经验来看,大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后,一般都能以较高的性能运行了,足以满足我们的需求。但是在不同的生产环境和项目背景下,可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。为了应对这些挑战,需要使用更高级的技巧来处理这类问题。在后续的《Spark性能优化指南——高级篇》中,我们会详细讲解数据倾斜调优以及Shuffle调优。

原文出自:https://tech.meituan.com/archives

Spark性能优化指南——高级篇

美团点评阅读(2189)

前言

基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。

数据倾斜调优

调优概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

数据倾斜原理

如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

  • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

解决方案二:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

解决方案三:提高shuffle操作的并行度

方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

解决方案六:采样倾斜key并分拆join操作

方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join即可。
  • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

shuffle调优

调优概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

写在最后的话

本文分别讲解了开发过程中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。希望大家能够在阅读本文之后,记住这些性能调优的原则以及方案,在Spark作业开发、测试以及运行的过程中多尝试,只有这样,我们才能开发出更优的Spark作业,不断提升其性能。

原文出自:https://tech.meituan.com/archives

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交