Jay Kreps (Confluent CEO,Kafka 核心作者) 在《The Log: What every software engineer should know about real-time data’s unifying abstraction》中系统性描述了日志的价值和重要性,指出了日志特定的应用目标:它记录了什么时间发生了什么事情(they record what happened and when)。而这,正是分布式系统许多问题的核心。

背景

  • 年前无意间阅读公司MQ团队的一篇博文 消息队列价值思考,其中提到了本篇译作的原文,有种相见恨晚的感觉。原文讲解日志对于存储系统的关系、重要性,整个思路、思考对于我们做应用设计有非常大的借鉴价值,大致读完,发现这种文章与DDIA书籍都应该在本科学习数据库的阶段就学习掌握。因此即使网上已有公开的几篇翻译,但由于其重要性,特在自己的博客上记录、翻译一遍。
  • 原文写于2013年底
  • 原文作者 Jay KrepsDecember ,以下我们简称为JK。

Ref

译文

P0:背景

JK大佬在2013年的六年前加入了领英。彼时为何有趣?是因为当时 LinkedIn 正准备重构一个单体巨石并且是集中式的数据库,目标是转换为一系列定制的分布式系统。最后这段经历成功且有趣:JK以及 LinkedIn 构建、部署了分布式的图数据库、分布式的搜索后端、一个Hadoop集群,并且还有第一代第二代的KV存储。

整个过程学到的一个关键点:日志是系统设计、构建的核心。log日志还有一些别名:

  • 预写日志 write-ahead logs
  • 提交日志 commit logs
  • 事务日志 transaction logs

log日志,自在线系统、分布式系统存在起,就已经伴随我们左右。

重要性

如果不懂log日志,就无法掌握:

  • RDBMS
  • NoSQL
  • KV存储
  • replication 复制同步
  • paxos 一致性
  • hadoop
  • VersionControl 版本控制
  • 几乎所有软件系统

本文,JK会带领大家一起了解log日志的所有知识点。

P1:什么是log日志?

log日志是最简单的存储抽象。特点:

  1. 写入:只追加(AppendOnly);
  • 读取:从左往右;
  • 全局有序;
  • 每一行为一个entry,每一个entry记录一个唯一的、有序的编码(与时间对应);

log

由以上 3 4 可得,越靠左的entry写入时间越早,因此第4点的编码与时间对应由此而来。以写入顺序来映射时间看起来有点奇怪,但是到后面我们研究分布式系统时,这项特性价值会越发明显。

日志记录的内容、格式在本文来说不太重要。同时注意,我们不能一直追加记录到文件中,因为空间资源是有限的。这一点后面会聊到。

日志与表、文件区别不大。表的本质是一组数据记录,而文件的本质是字节集合。日志则是记录按时间排序的表或者文件

这么简单的东西,有必要写一篇文章来讲解吗?一个AppendOnly的有序记录文件与数据系统有何关系?

答案是,log日志与分布式系统有一点核心是一致的,他们解决的都是:记录了何时发生了什么(they record what happened and when)

有一点内容需要澄清强调:本文所说的日志,与应用日志不是一回事。

  • 应用日志一种非结构化的错误信息、追踪trace信息,一般使用SyslogLog4j输出到本地文件。
  • 应用日志是本文所指log日志的退化表现。
  • 应用日志一般叫text log,主要用作程序员阅读、排错。而本文所指的log一般叫journal | data logs,一般用作程序解析。

大致想想,应用日志其实是一种时代错误(anachronism),当服务变多系统变复杂后,应用日志逐渐变得失控。而基于这种非结构化的日志想做一些解析的工作真是难上加难。

P1.1:数据库中的日志

日志概念过于简单,简单到我们都不好叫它为一种发明,其最早跟随 IBM System R 出现。最开始日志在数据库中用于崩溃时保证不同数据结构、索引的同步。为了保证原子性、持久性,数据库在应用变更前会先记录要改变的信息到日志中。

而表、索引则是经过设计的一种数据结构的映射。

随着时间发展,log日志最开始是一种ACID的实现,变为了数据库多节点间复制的实现。到这里,日志中记录的序列号成为了保持多节点同步的重要依据

Oracle, MySQL, PostgreSQL 都有对应的主从同步的传输协议。 比如Oracle的XStreams and GoldenGate 为非Oracle的系统提供订阅机制。MySQL, PostgreSQL类似。

由于这种根源,最开始基于日志订阅的机制只局限在RDBMS中,巧合下才兴起。基于日志的订阅这种抽象非常适合用于:

  1. 消息服务;
  2. 数据流;
  3. 实时数据处理;

P1.2:分布式系统中的日志

日志解决了分布式系统中的两个核心问题:

  1. 数据有序变更(对应数据更新时可对顺序达成一致);
  2. 分发数据(对应拷贝数据时就修改的结果可达成一致);

分布式系统中基于日志的处理方式源于状态机复制原理: 如果两个确定的进程,从相同的状态开始,并且以相同顺序进行了相同的操作,其输出一致,多个进程的最终状态一致。

Deterministic 确定性 意思是处理过程与时间无关、不受其他外部因素影响。 举几个反例:

  1. 多线程执行顺序不同,结果不同;
  2. 调用时间相关函数:gettimeofday
  3. 不可重复的操作(非幂等接口重复调用); 这些都是不确定的。

而进程的状态指的是,当操作结束后,留在机器上的任意数据(内存或者磁盘均可)。

我们上面讲了,日志可以保证相同的顺序+相同的输入=相同的输出。这个要成为我们的直觉(启发式、公理): 相同日志输入+确定性的处理代码逻辑=相同的输出。

再进一步,如何应用到分布式计算的场景中?我们可以进行归约(reduce): 转换问题 多个机器做同样的事 为 实现分布式持久化日志,以供分布式进程消费。

此处日志作为统一的输入,将不确定性排除的同时,保证所有订阅了日志的副本(replica)消费都是同步的。

理解了这里,也就理解了分布式系统设计中最通用的一个准则:确定性的操作带来确定性的结果

这里比较奇妙的一点在于,log日志记录的时间戳序号,现在可以表示每个节点的状态。每个节点记录当前已处理的最大项的时间戳,时间戳加上日志唯一确定节点完整的状态。

根据日志记录内容的不同,我们可以有不同的原理应用方式。比如,我们可记录服务到访的请求、服务处理请求之后的状态变化、执行过的命令,甚至可以记录一系列机器指令、方法名、参数。只要两个进程以同样的方式处理同样的入参,多个副本间的结果是一定一致的。

不同群体对日志有不同的叫法。搞数据库的一般把日志分为物理日志+逻辑日志。

  • 物理日志(physical logging):记录每个改变的行。
  • 逻辑日志(logical logging):记录DML语句。

分布式系统一般有两种数据处理、复制的方案:

  1. 状态机模式(state machine model == active-active model):请求会写入一份日志,然后每个副本处理同步;
  2. 主备模式(primary-backup model == active-passive model):选举一个副本为主节点(leader),主节点按请求到达顺序处理写请求,之后通过日志广播变更,然后其他副本执行同步追赶进度。如果主节点遭遇故障进入选举流程。

nodes_process_replicate_model

举例说明:假如现在服务端依次执行指令:+1*2

  1. 状态机模式:每个副本各自执行两条执行;
  2. 主备模式:主节点执行+1*2,日志中记录结果,副本节点依次同步变更;

所以很明显,如果顺序不同,结果也会不同

分布式日志可以视为共识问题(consensus)的一种建模后的数据结构。 日志:表示针对下一个值是什么的一系列决策。参考Paxos家族的算法,使用日志是最实用的手段。Paxos一般用一种叫做multi-paxos的协议来对一系列共识问题建模,日志中的每个槽(slot)都这样来处理。 同类型的协议还有:

关于历史理论的发展JK有一些质疑:也许分布式计算的理论研究有些超过了现实的应用。现实中共识问题往往过于简单了。计算机系统很少只用去处理单个值,一般都需要处理一系列的请求。所以log日志,不单单是一个单个简单值登记册,而更是一种自然的抽象。

更进一步的考虑,JK认为过度关注算法以至于我们忽视了分布式系统底层的日志抽象。未来,日志可能更有可能作为一种通用的接口而不是具体的实现被我们所使用。就像hashtable一样,我们一般使用getput,很少去关心底层线性表、hash的具体实现。

P1.3:Changelog101:表与事件是对偶的

回到数据库,一个带有变更的日志与表是对称的。 日志类似于一个拥有借款、赊账的银行,而数据库则表示银行卡余额。

  1. 正向:如果我们有变更记录日志,就可以以此执行变更,最终得到一个table表。表记录的是一段时间内(日志处理时间段)某个key对应的最终状态。
  2. 逆向:如果表发生了更新,我们可以记录变更,并且发布到changelog中。changelog是我们支持近实时(near-real-time)复制的原材料!

yin_yang

综上,我们的表与事件是二元对称的:

  1. 表维护数据;
  2. 日志记录变更; 如果日志记录的是完整变更,那根据日志不仅可以还原最新版本的数据,也可以回退到任一时刻的状态。所以日志是表所有状态的备份依据。

听起来日志跟我们平时使用的代码版本控制类似了。版本控制解决的只是分布式系统解决的一部分问题:管理分布式、并发的修改

版本控制系统说白了是将版本记录建模为一系列有序的补丁(models the sequence of patches),这其实就是log日志。而每次我们本地检出的代码快照(checked out “snapshot” ),其实就是表的概念。分布式系统中发生变更时,副本节点需要执行复制(replicate),而在版本控制中发生变更后,我们需要先拉取补丁(拉取日志)然后应用变更到当前快照,这其实就是replication的过程。

工业界的一个实例是 Datomic ,这家公司销售一种基于日志的数据库。理念与我们上面表达的一模一样。

目前为止,我们讲的基本都是理论,别急,马上要进入实战环节了!

P1.4:接下来是啥?

本文接下来的部分,JK主要想带领大家一起过一下分布式计算的核心原理,以及分布式计算的抽象模型。分成如下几个部分:

  1. 数据集成:旨在让所有系统(存储或者计算)都能很方便地访问任一系统的数据;
  2. 实时数据处理:计算派生数据流;
  3. 分布式系统设计:工业界如何把分布式系统的核心简化为log日志;

每个部分中,使用日志的过程无非是在演绎这句话:生成一个持久化的、可重放的历史记录。而分布式多机背景下,无非就是多台机器各自按确定性的方式、一定的速率来执行这个重放日志。

P2:数据集成

本节结构:

  1. 探讨什么是数据集成;
  2. 数据集成的重要性;
  3. 如何关联到log日志;

数据集成的目标是使得我们内部所有的数据在任意一台服务与系统都可以被访问到。

数据集成这个词不太常见,以我们常见的ETL为例,作为数据集成的子集,ETL落地为了一种关系型数据仓库。如果ETL抽象的足够通用,就可以覆盖实时系统、流处理的场景。

搞大数据的一般没怎么讲过数据集成,但是在「让我们的数据可用」这件事上,大家的关注点是一致的。

工业界对于数据的使用符合马斯洛需求理论。

  • 首先最基础的需求是抓取数据,放入一个可用的处理环境中(可以是实时查询系统、文本、python脚本);
  • 其次是将数据建模抽象为易读易处理的格式;
  • 最后才是考虑如何在存储组件之上如何处理数据(MapReduce、实时系统);

所以数据平台建设的正确步骤应该是:

  1. 抓取数据,搭建处理平台;
  2. 抽象数据建模,提供更容易理解的语义;
  3. 更复杂的处理:可视化、报表、算法处理、数据预测;

现实中,大家往往是背其道而行。所以,如何建设可靠的数据流系统,是最重要的事

P2.1:数据集成的两个复杂点

行业发展带来了两个有复杂度的场景:

  1. 事件流数据
  2. 定制数据系统

P2.1.1 事件流数据

简单来讲就是各种记录性质的埋点数据、监控数据(事件、指标)。即trace、log、metric。

科技公司由于信息化做得充分,所以这类基础搞起来趁手,而传统一点的公司也有很多这方面的应用,并且随着信息化发展,事件型数据会越来越多。

相比业务数据,监控埋点数据量级高出不少,所有有其处理难度。

P2.1.2 定制数据系统

第二类数据集成的难点场景则是定制的各类数据系统,比如我们需要为团队单独定制OLAP、搜索、批处理、图分析各类系统。

多种系统共存数据带来了挑战。

P2.2:结构化日志 数据流

日志是处理多系统间公用数据的自然抽象。使用方式很简单:将日志作为多个系统实时订阅的中心化存储。

log_subscription

如上图,每个逻辑上的数据源都能抽象出自己的日志模块。数据源本质上就是为了记录事件(events-clicks-pv),而数据库表是接收变更结果。下游的每个订阅系统从日志中尽快读取数据,将每一行新记录写到自己的存储中,并且向前挪动消费位置(offset)。这里的订阅者(消费者)可以是任意一种系统:缓存、hadoop、数据、搜索系统。

P2.2.1 逻辑时钟

上例中,日志作为一种逻辑时钟,订阅者根据这个可以算出自己的消费进度(状态、位置),offset表达了自己在消费维度上的时间进度。

还是举个实际例子来辅助理解: 假设我们有一个数据库+若干个缓存系统。使用日志可以帮助我们进行数据同步以及推出每个缓存系统处理的时间。 首先我们写入一个X值,接着需要从缓存中读到该值。如果需要保证使用方不能读到过期数据,只需要保证不从「还未消费到X值位置的缓存节点」读数据即可。

P2.2.2 buffer缓冲

log日志还能做数据消费的缓冲buffer。 特别是同一个系统中有多个不同的消费者,并且大家消费速率不一致时,这种设计非常好用。 好处:

  • 订阅系统可以宕机、挂机维护,等重启时,消费过程可以继续;
  • 订阅者以自己的节奏进行消费;
    • 批处理系统比如hadoop或者数仓可以一小时或者一天消费一次,而实时系统则需要在秒级别消费;
  • 消费者只需要与日志系统交互;
    • 消费者可以自由上下线而不影响上下游;
    • 消费者不需要关心数据存在哪里,只需要跟日志保持一致的通信协议;

Count Leo Tolstoy :每个好的数据系统看起来都像是日志的架构;而坏的数据系统则各有各的样子。

上面我们使用了日志log的订阅,而不是消息系统的发布订阅(pub sub),原因在于消息系统表达的与日志并不完全一致,我们可以把日志当做可以保障持久性以及强有序的消息系统。在分布式理论中,我们称之为原子广播(atomic broadcast)。

到此,JK还是得强调,日志仅仅是基础设施。距离我们掌握数据系统还有一段路要走,接下来,我们会关注:

  • metadata 元数据;
  • shemas 数据格式;
  • compatibility 适配性;
  • 以及处理数据结构的细节以及演进过程;

P2.2.3 在LinkedIn领英公司的实践

回忆下我们的背景部分,JK加入公司后,经历了一系列从单体到分布式的架构重构。

当前领英核心的系统包括:

每一个都是特定场景下的定制系统。从JK来到领英,以日志为核心的数据流系统设计思想就贯穿始终,其中我们有一个最基础的设施服务叫做 databus ,它就是我们的日志抽象实现,最开始用于广播Oracle表变更,以满足下游数据消费的需求。

最初的背景:JK最开始在2008年的时候参与实现一套KV存储,而下一个任务是要把我们的一些推荐系统移植到hadoop体系中。 由于经验不足,在这个项目上踩了一些坑:

  1. 最开始计划是直接从Oracle导出数据,但是发现:
    1. 这个导出太恶心了;
    2. 报表处理时间比较久(出错就GG);
    3. 处理过程不可逆;
  2. 所以后来抛弃了从Oracle直接导数的做法,我们直接以日志为数据源,并导入到第一代KV存储 Voldemort 中 ;
  3. 在这种导入导出的工作中,工程师的时间大大被浪费,而且中间一旦出错,你就得想办法重新处理;
  4. 尽管我们搞出了这么一套通用的方案,但是每一个数据源都需要单独配置,这个工作量不小,而且容易出错。接入方越来越多,每个系统都想要接入各种各样的系统来满足各种奇怪的需求;

到此,我们来分析一下,问题到底出在哪里?

  1. 我们构建的管道流系统,有点乱,但很有价值;
    1. 以前难接入的计算现在变易用了;
    2. 很多新产品、分析工作,只需要接入新的数据源即可;
  2. 数据管理更强有力的支持可以帮忙我们构建可靠的数据。如果我们支持了所有的数据结构,Hadoop就可以全自动加载数据,无需人工介入。及时schema变更也可以自动跟进。
  3. 目前覆盖的数据场景还不全。以领英为例,数据场景太多了,想要全部支持难度也不小。

知道了问题,我们结合一下现状。按照我们为每个数据源与消费者定制的思路,其实是可行的,如下图。 这种做法的问题在于:为每个项目都得定制一个数据生产者与消费者。整体实施的复杂度是O(n^2)

datapipeline_complex

将上面这种架构进行优化: datapipeline_simple

这样就是一个O(n)的解决方案。核心思想: 将生产与消费者隔离,通过中间的Unified Log做一层抽象,单点服务只需关注如何做集成,而无需关注对接方如何处理数据。

说白了,Kafka就是这样诞生的!彼时亚马逊也有类似于Kafka的产品:Kinesis。架构类似:

  • 作为一个管道服务,连接其他的分布式系统(都跑在EC2机器上)
    • DynamoDB
    • RedShift
    • S3

P2.2.4 ETL与数仓的关系

关于数仓,其核心概念是作为一种干净可集成的数据结构,用于业务分析。 关键流程:

  • 定期从源数据库提取数据 E Extract
  • 把转换成一种易用的格式 T Transform
  • 将上一步的产出加载到统一的中心化数仓 L Load

综上,数仓是组织内宝贵的数据资产持有者。数据价值可用于业务分析、处理。

数仓比较适用于一些报表、整合指标(counting、aggregation、filtering),但团队仅仅只有数仓不太好满足一些实时业务、搜索、监控等需求。

JK觉得ETL关乎两个关键:

  1. 数据提取、清晰,将数据从特定系统解绑抽离;
  2. 为查询、使用重构数据(比如为了更好地适应RDBMS,需将数据转为星型、雪花型schema,而为了更好适应聚合,需将数据转为列式存储); 但是耦合这两步是个问题。我们系统中的数据应该能适应各种实时、报表、搜索场景。

很多时候数仓团队自己去处理清洗数据,但理解业务的往往是业务团队。

  • 目标没有对齐;
  • 业务不够熟悉;
  • 需求响应不够快 导致的结果就是数据覆盖不够全面、数据问题多、需求响应慢。

更好的方式是:

  1. 数仓团队只管理中心化的日志系统(同时配备操作数据的接口);
  2. 而数据的生产方去考虑如何转换、清洗数据格式;
  3. 消费方通过接口拉取日志中的统一数据;

pipeline_ownership

团队中考虑数据可拓展性是非常常见的:

  1. 有时候我们需要在已有系统上增加搜索功能,比如全文检索;
  2. 有时候需要增加实时监控、告警;

这些需求在传统数仓、Hadoop集群环境下并不好支持,而ETL技术栈更无能为力。但是如果有了上图这种系统:有完善统一的数据结构、并且支持数据访问,那么增加上下游的能力就简单多了。

这种理想化架构的特点:

  1. 在往日志写数据前,可以进行数据清洗; 因为这一步的操作人最清楚数据需要如何被处理,并且这里的操作应该是无损、可逆的。
  2. 对日志可以进行实时转换,产生新的日志; 而一些添加新值一类的转换,则可以基于日志统一在业务处理之后进行处理。比如为一些事件类型数据增加session字段;或者为一些通用工鞥那增加衍生字段。
  3. 在目标系统中可以处理一些加载逻辑; 只有与消费者这里相关的聚合才适合放到加载阶段处理,比如需要把数据转为星型、雪花型的schema(支持报表、分析场景)。

以上,是经过分析后的良好的ETL架构。

P2.2.5 日志与事件驱动

上述架构除了一些正向的好处,也带来了一些附赠的可能性:

  1. 系统解耦;
  2. 事件驱动;

一般在web技术栈中我们通过记录日志来往数仓或者hadoop技术栈输送数据,以供后续的聚合分析、查询使用。这种做法的问题与上面我们分析ETL系统一致:就是我们整个数据流程耦合了数仓、进程调度、业务逻辑。

而在领英,我们基于日志中心构建了一套事件处理的架构。首先以Kafka为中心,多个订阅方消费事件日志。由此,我们定义了上百种事件类型,每种类型都捕获对应的事件数据。这套架构覆盖了公司内用户画像、搜索、服务调用以及异常等业务场景。

下面举例说明帮助大家理解一下上述架构的优势: 场景是一个展示JD岗位的页面,本来job页面应该只描述岗位相关信息,但是由于各种动态化的需求,慢慢地这个页面上会加上各种逻辑,比如:

  1. 我们需要离线分析JD数据;
  2. 需要针对PV做count统计分析;
  3. 需要针对一些访问聚合统计结果,以供发布者了解访问走势;
  4. 需要记录用户访问记录,以此确保给用户的推荐不会重复;
  5. 需要跟踪JD页面的浏览数据,以此衡量这个岗位的流行程度;
  6. 等等。

按照这个发展,我们的一个展示岗位的页面变得越发复杂。 往往,我们的端也是多样的:手机移动端、wap端、web端,端也提高了复杂度。而当一个新人需要加新功能,那他可就头大了。而现实中、生产环境中,问题只会比这个更复杂。

「事件驱动」架构可以简化这种场景的复杂性。 岗位展示页面只展示岗位信息,同时记录关联属性、查看用户。 而其他下游、对接系统(推荐、分析、数仓)只接入、消费日志,下游的处理与核心业务系统解耦,这样业务之间完全不影响,复杂度维度也降低了。

P2.2.6 构建可拓展的日志系统

将发布者与订阅者解耦并不是啥新鲜事,但是在这其中,如果想搞一个支持多订阅、实时的日志系统,支持大规模拓展还没那么简单。

大家一般会认为分布式日志是一种比较慢、重量级的实现(像ZK只负责存储元数据metadata),但如果想支撑大数据量的流式场景,日志的实现必须要快、轻(可以接入各种数据)。在领英,使用Kafka单日处理的数据量级达到了600亿的级别(算上数据中心之间的同步,单日写入量就上千亿了)。

Kafka的「秘诀」如下:

  1. 日志分区(分片);
  2. 通过批量读写优化吞吐;
  3. 避免无用拷贝(零拷贝 zero-copy);

比如这就是分区日志: partitioned_log

  1. 每个分区都是有序的,但是分区间不保证顺序(除非业务层维护了顺序比如时间字段)。
  2. 数据写入到日志分区由生产者控制,一般通过业务字段路由到对应分区。
  3. 这种写入机制不需要协调多个节点,因此可以根据业务规模线性拓展集群。
  4. 每个分区可配置一定数量的副本节点,副本会同步追赶主节点的进度,当主节点挂掉时,副本们会选举出一个新的主节点。
  5. 不保证分区间的全序可能有点限制,但是问题并不大。因为log日志流的场景下,我们的节点往往有成百上千个,所以保证全序并没有那么要紧。像Kafaka保证的语义:从单个生产者发出的消息会以发出的顺序进行处理。
  6. 日志与文件系统类似,针对线性读写有很多通用的优化思路,比如日志可以将小的读写操作合并为大的操作,以此提高吞吐。Kafka将这个理念发挥到了极致,它的批量场景:
    1. 生产者像broker发数据时;
    2. 写磁盘时;
    3. 多节点间的数据复制时;
    4. 消费者拉取数据时;
    5. ack提交响应时;
  7. 而在内存log、磁盘、网络传输中,Kafaka仅仅使用最简单的二进制格式,所以很方便使用零拷贝(zero-copy data transfer)。

如此一来,我们的读写效率发挥到了极致,在内存超大的情况下,磁盘与网络IO也能打满。

本文不过多讲解Kafaka,更多细节还请大家自行参阅 http://sites.computer.org/debull/A12june/pipeline.pdf 以及 Kafaka设计文档

P3:日志与实时流处理

到此,我们主要探讨了在系统、节点间复制数据的各种操作,但这还远不是分布式系统的全貌。下面,我们探讨下流处理,日志≈流处理(http://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/)。

P3.1 什么是流处理?

JK认为传统的对流处理的理解是不对的,他认为流处理与SQL、实时处理毫无关系。

JK对流处理的定义:

  1. 进行持续数据处理的基础设施;
  2. 数据计算的模型可以抽象为像MapReduce类似的分布式处理框架,同时需保障低延迟;
  3. 数据处理的模型往往取决于数据收集的模型,比如批量收集一般就批量处理,而持续采集的数据一般会持续处理;
  4. 以1790开始的美国人口普查为例,当新一轮的普查开始后,工作人员挨家挨户进行调查、填表,收集完表格之后进行汇总;这个例子背景有点久远,但是与批量处理的模型是一致的;

在领英内部,数据处理完全不用批处理。分析业务场景,发现只有两类数据:

  1. 活动、事件、监控类;
  2. 数据库事务类;

这种业务天生就是持续处理的。反过来想想为什么会有批处理?往往是因为数字化程度(自动化、信息化)不够!系统中有一些人工介入的操作,而且人工操作必定是有停留时段的(数据产生到处理结束有空窗期,想想A跟B通过邮件完成一个流程节点需要多久?)。

而我们线上的很多定时任务,本质上是一种固定时间窗口的持续处理任务。整理下JK对流处理的认知:

  1. 以一定时间维度,处理数据;
  2. 不要求使用静态数据快照,支持用户自定义处理频率;
  3. 流处理是批处理的泛化抽象(在实时业务场景下,这个意义很大);
  4. 很多公司的实时数据收集没做好,这个影响了后续流处理的成效;
    1. 比如很多业务形式是面向文件、定期处理的,这个是与实时数据脱轨的;
    2. 在领英早期,很多数据也是以小时的维度收集的,接入流处理的时候,系统优势体现不明显,这也是很多系统常见的问题;
    3. 而在金融领域,流处理系统能大放异彩的原因就是,金融系统建设完善,实时数据早就建设好了;
  5. 流处理系统弥补了实时业务与离线批处理业务之间的gap;
  6. 而log日志,解决了流处理中的很多关键问题;可以参考开源的 Samza ,其中有很多以上设计的实现细节;

P3.2 数据流向图

dag

流处理最有意思的一点并不是流处理本身,而是关于我们在上个大部分(数据集成)中所讨论过的「什么是数据流」:feeds或者log日志本质上就是事件+多个系统间流转生产出的数据。流处理可以处理一个feeds之外的数据,而每一个feeds或者数据流在内部封装了其复杂度。

拓展一下思路,任何一个从日志读数据并且将结果写入到其他系统的模式,我们都可以称之为流处理任务。log日志作为中枢节点连接了多个系统,如此一来,以日志为核心,你可以看到团队整个架构内所有的数据抓取、转换、流向。

而流的处理要求并不高,你可以采用任意的框架、技术栈,如果有更完备的基础设施来帮助调度节点、流程,那自然更好。

P3.3 日志集成系统的两大优点

  1. 支持了单一数据集之上的多订阅者,保障了有序性。 回想下我们上面提到副本复制时有序的重要性!日志同时保障了持久性,支持了容错(崩溃恢复)。
  2. 日志是天然的进程间的buffer。 在「发布订阅」模式下,很多时候数据生产者比消费者处理更快,如果以非同步的方式数据,这种情况下当前进程会阻塞,接下来我们可以选择:
    1. 使用buffer暂存数据;
    2. 丢弃数据; 如果业务不能容忍丢数据,那利用日志作为buffer是很好的选择,日志一方面够大,一方面保障持久性,并且与数据流中其他节点都解耦(不直接依赖下游)。

Storm + Samza 都基于上述理念设计,也都支持使用Kafka作为日志组件。

P3.4 有状态的实时处理

很多事件数据流处理其实是无状态的,复杂的玩法主要是计数统计、聚合、连表操作。 但有时候我们基于事件需要记录一些状态数据,比如一个点击用户的userId。如果处理节点挂掉,我们怎么保证状态数据不丢失?

首先最简单的做法当然是在内存中维护userId,但是进程挂掉,数据也就丢了。数据在内存中存活的窗口期越久,数据丢失的时间维度也就越长。

另一种方式是依赖某个远程的存储服务,通过网络调用。但是会增加网络IO的开销,以及数据损失了本地的局部性(局部性原理!!!)。

不禁发问:如何按照我们处理的流进行分区,并且实现一个类似表的东西?

回想下上面提到的表与日志的对称性!本地日志就可以记录状态变更,而语句的执行结果就是一个表。同时表保证容错性。

表或者索引存储了状态,举几个例子:

  • bdb
  • leveldb
  • Lucene
  • fastbit

通过changelog记录了状态变更,在系统崩溃或者重启时可作为恢复的依据(基于时间有序、增量记录)。 这种机制其实是很通用的抽象:通过日志作为分区,记录状态,根据入参决定不同的日志形成各自的分区。 状态自己就是日志,那其他下游的处理者订阅日志即可。

当有数据集成的需要时,上述这个模型就非常实用。从原始数据库我们提取出一份changelog,以下游需要的方式进行index索引处理。

更多细节可以参考 Samza

P3.5 日志压缩(Log Compaction)

log_compaction_0

保留完整的日志需要很大的空间,而系统的空间资源是很宝贵并且有限的,所以,我们需要压缩日志。

我们以Kafka为例,分两种情况:

  1. 带key值的更新; 由于可重放的特性,可以进行重复key的压缩。
    1. 丢弃老旧淘汰数据;
    2. 合并更新同一条数据的记录,例如 +1与+2合并为+3;
    3. 这个特性为 log compaction
  2. 事件数据; 配置化保留一定周期(时间窗口)或者一定大小(内存范围)(周期、数据条数、文件大小)。

P4:系统设计、构建

最后一个话题:日志之于存储,也对我们(工程师)在做在线系统设计时有很大的借鉴意义。

在线系统是我们最常见的业务应用形态,所以这点非常重要。

P4.1:分布式存储

首先日志在分布式数据流(data flow inside a distributed database)、大规模的数据集成(data integration in a larger organization)中,作用类似:

  1. 统一抽象数据流表示(日志);
  2. 保证数据一致性;
  3. 崩溃恢复,保证数据持久性;

然后,我们发现数据流系统与数据集成系统都可以看做是一个分布式存储系统。

  1. 面向查询的系统(Redis, SOLR, Hive tables)无非是在我们数据之上添加index索引;
  2. 面向流处理的系统(Storm or Samza)无非是开发好的触发器、视图;

所以不同的数据系统,无非是不同的索引机制、类型。

同时我们回溯下数据系统发展的历史,最开始的时候我们只使用一个RDBMS,比如MySQL或者Oracle(讲真,我最开始参加工作的时候就是用一个Oracle处理所有需求,那还是2016 2017年),当所有数据都放在同一个数据库里的时候,我们的在线事务处理、统计分析、批处理统统在一个存储里完成(都是SQL),这个时候,其实根本不存在数据集成的需要。 而随着业务规模、业务形态的发展,我们慢慢产生了拆分数据的需求:

  1. 系统拓展;
  2. 地理因素(比如不同国家地区有不同业务需要);
  3. 安全性;
  4. 性能隔离

这些是拆分数据、系统最迫切的因素。这种时候,单个系统不好满足所有场景的需求,所以数据集成的复杂度、需求,是跟随业务规模、形态早就存在的。而使用一个大的Hadoop集群完成所有业务形态数据的存储,其构建复杂度是很高的。随着工业界近些年的演变,好的设计应该是这样的:

  1. 在多个集群中,调度编排多个服务实例;
  2. 单个服务实例不必完成所有功能,如安全性保证、性能隔离、良好拓展性;
  3. 集群服务中保证单个问题都有对应解决方案;

将单个大而全的服务拆分为上述各个小的服务,也减低了我们落地实施的成本。同时各服务(存储)内职责更明确,设计目标也更加聚焦。

JK根据已有经验,对数据系统未来发展做出了几个预想:

  1. 保持现状,这种情况下数据集成会留有一定复杂度,因此,一个外部可集成数据的日志系统就很重要。
  2. 一个大而全的超级系统,这个明显不符合工程最佳实践
  3. 第三种设想对工程师来说是最有吸引力的,所有参与其中的服务、系统都是开源的。开源将我们使用的服务、系统进行了彻底的解耦。这一点可以参考目前Java技术栈:
    1. Zookeeper负责分布式服务的协调调度(得益于更高层抽象的 Helix Curator);
    2. Mesos Yarn 负责进程虚拟化、资源管理;
    3. 三方库 Lucene LevelDB 负责索引构建;
    4. Netty Jetty 以及封装库 Finagle rest.li 负责远程通信;
    5. Avro Protocal Buffers Thrift umpteen zillion 负责序列化;
    6. Kafka Bookkeeper 负责日志挤压流转(backing log);

总结第三种方式,本质上就是职责分离、高内聚低耦合、组合模式、DC(Divide and Conquer)分治思想。积木式服务化的架构非常利于我们做系统演进,并且由于其灵活性、可靠性,我们的实施成本变低了很多。而分治思想则将大问题拆解为了小问题,小问题无疑更容易解决。

P4.2:日志在系统架构中的角色

一个带有外部日志的服务,可以将一些通用的复杂性交给日志模块来处理,比如:

  1. 通过将多节点接收的并发修改序列化(sequecing),来达成数据一致性(实时的、最终的);
  2. 提供多节点间的数据复制功能(replication);
  3. 对数据写入方提供写入语义(commit),比如当应用可以保证数据不丢时返回一个ack响应;
  4. 面向系统提供基于外部日志的订阅流(subscription feed);
  5. 保障崩溃恢复、节点数据追赶(restore failed replicas or bootstrap new replicas);
  6. 支持多节点间的负载均衡(rebalancing);

写到这里发现,这不就是一个分布式系统的职责与实现吗?剩下的,无非就是查询与索引。而查询与索引在不同系统间会有不同的设计,比如ES(基于Lucene构建索引)在做查询的时候,只需判断是单条(根据id找到对应partition或者shard)还是多条聚合(拉取所有使用到的分区数据)。

到此,一般一个存储系统就分成了两层:

  1. 服务层(Service); 为对应查询创建索引,如一个KV存储可能需要b-tree sstable,而一个搜索存储则需要倒排索引(inverted-index);
  2. 日志层(Log);
    1. 有序捕获状态变化;
    2. 负责接收写入(可能被上述服务层代理,但写入的最终处理是由日志层负责),写入到日志时同时产生一个时间戳作为时序的索引; 假定我们这个存储系统是经过了分区的分布式系统,那么服务层与日志层分区数一致(机器数可以不同)。

service_log_layer

如上图所示,Service节点订阅日志变更,发生写入时尽快以日志顺序同步到自己本地的索引中。此时客户端完成了read-your-write(读自己的写)语义:

  1. 查询时携带写入时生成的时间戳;
  2. Service节点比较此时间戳与其本地索引构建时记录的时间戳,如果发生了日志->服务层同步的延迟,客户端请求等待节点同步追赶,以防返回过期数据;

Service层在简单情况下无需处理主从、选举问题。因为此时日志保证了数据的一致性、准确性。

至此还有一个棘手的问题,是每个分布式系统都需要解决的:

  1. 节点失败后恢复数据;
  2. 节点间转移分区;

一种经典做法是:

  1. log中保留固定周期的数据(想想Redis中的AOF);
  2. 对数据进行全量快照(想想Redis中的RDB);
  3. log中保留全量数据,并对log进行整理压缩(相同key只保留最终值)(想想Redis中的AOF Rewrite)(Log Compaction),3这种做法等同于1+2的效果;

而上述在log层处理这类问题,则将负责度交给了log来处理,Service层无需关注。并且log层的处理是系统级别的,可以通用。

基于以上描述的log系统,我们也就有了一套可存储数据的ETL数据源,并且可以为其他系统提供完备的订阅接口。

log_centric_infrastructure_stack

到此,结合上图,我们发现log系统成为了其他系统处理、加载的数据流的提供者(provider of data streams)。同时,一个流处理者可接收多个输入,并且将数据提供给其他系统(以做其他类型索引的处理)。

回到我们P4部分的标题:系统设计、构建。JK觉得这种将查询、log分层是一种通用的系统设计思路:分离查询特性、一致性、可用性。这种设计思想对于我们构建系统、理解系统很有帮助。

Kafka 或者 Bookeeper不一定必须是持久化的设计。我们完全可以设计一个 Dynamo 这样只保证 AP 的最终一致(带来的一个副作用是消息会重放,消费端需保证幂等)的系统。

有人认为在log中保留全量数据是一种浪费。其实在很多方面,这个问题已经被弱化了:

  1. log日志是一种高效的存储格式。比如linkedin在2013年的时候,kafka服务端在每个数据中心存储了75TB的数据;同时应用层面则要求使用更多的内存,所以log更节省成本;
  2. 应用对硬件要求也更高,比如使用了SSD,而log只需要线性顺序写,使用普通磁盘进行存储即可;
  3. 结合上图,应用架构复杂后,参与其中的应用变多,每个业务形态中的索引都需要单独存储,这样整体上平摊了单个节点的费用(amortized over multiple indexes);

上述几点说明,log日志的存储成本并非问题。

围绕日志的这种架构模式也是领英在内部构建自己系统的方式,所有的服务订阅一个存储(使用databus作为一种日志抽象 或者 使用kafka来做日志系统)。应用层处理分区、索引、查询逻辑。领英就这样实现了他们的搜索、社交关系图、OLAP查询系统。

使用log的程序也需要根据情况适应调整,一个完全可靠的系统(如RDBMS)可以借助log来完成:

  1. 数据分区;
  2. 节点恢复;
  3. 负载均衡;
  4. 一致性;
  5. 数据广播;

如此一来,应用层(Service 服务层)变成了处理索引、查询的缓存模块。

P5:结束

If you made it this far you know most of what I know about logs. Everyone seems to uses different terms for the same things so it is a bit of a puzzle to connect the database literature to the distributed systems stuff to the various enterprise software camps to the open source world. Nonetheless, here are a few pointers in the general direction.

如果你读到了这里,JK大佬觉得,你已经了解了他对log的所有知识。最后JK大佬给大家提供了一些值得关注的资源:

学术论文、博客、talk