如何在Apache Flink中使用RocksDB状态后端

如何在Apache Flink中使用RocksDB状态后端,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

流处理应用程序通常是有状态的,“记住”已处理事件中的信息,并使用它来影响进一步的事件处理。在Flink中,记住的信息,即状态,被本地存储在配置的状态后端中。为了防止发生故障时丢失数据,状态后端会定期将其内容的快照持久化到预先配置的持久存储中。RocksDB状态后端(即RocksDBStateBackend)是Flink中三个内置状态后端之一。这篇博文将引导您了解使用RocksDB管理应用程序状态的好处,解释何时以及如何使用它,并澄清一些常见的误解。

Flink中的状态

为了更好地理解Flink中的状态和状态后端,区分飞行状态(in-flight state)状态快照(state snapshots)是很重要的。飞行状态,也称为工作状态,是Flink作业正在处理的状态。它总是本地存储在内存中(有可能溢出到磁盘),并且在作业失败时可能会丢失,而不会影响作业的可恢复性。状态快照,即检查点和保存点,存储在远程持久存储器中,用于在作业失败时恢复本地状态。生产部署的适当状态后端取决于可伸缩性、吞吐量和延迟要求。

什么是RocksDB?

认为RocksDB是一个分布式数据库,需要在集群上运行并由专门的管理员管理,这是一个常见的误解。RocksDB是一个用于快速存储的可嵌入持久键值存储。它通过Java本机接口(JNI)与Flink交互。下图显示了RocksDB在Flink集群节点中的位置。以下各节将详细说明。

如何在Apache Flink中使用RocksDB状态后端  apache flink 第1张

Flink中的RocksDB

将RocksDB用作状态后端所需的一切都捆绑在Apache Flink发行版中,包括本机共享库:

$ jar -tvf lib/flink-dist_2.12-1.12.0.jar| grep librocksdbjni-linux64
8695334 Wed Nov 27 02:27:06 CET 2019 librocksdbjni-linux64.so

在运行时,RocksDB嵌入到TaskManager进程中。它在本机线程中运行并处理本地文件。例如,如果你的Flink集群中有一个配置了RocksDBStateBendback的作业,您将看到类似于下面的内容,其中32513是TaskManager进程ID。

$ ps -T -p 32513 | grep -i rocksdb
32513 32633 ?        00:00:00 rocksdb:low0
32513 32634 ?        00:00:00 rocksdb:high0

注意:该命令仅适用于Linux。对于其他操作系统,请参阅其文档

什么时候使用RocksDBStateBackend

除了RocksDBStateBackend,Flink还有另外两个内置的状态后端:MemoryStateBackend和FsStateBackend。它们都是基于堆的,因为运行中的状态存储在JVM堆中。目前,让我们忽略MemoryStateBackend,因为它只用于本地开发调试,不用于生产。

使用RocksDBStateBackend,运行中的状态首先写入堆外/本机内存,然后在达到配置的阈值时刷新到本地磁盘。这意味着RocksDBStateBackend可以支持大于总配置堆容量的状态。可以存储在RocksDBStateBackend中的状态量仅受整个集群中可用磁盘空间的限制。此外,由于RocksDBStateBackend不使用JVM堆来存储运行中的状态,因此它不受JVM垃圾收集的影响,因此具有可预测的延迟。

除了完整的、自包含的状态快照之外,RocksDBStateBackend还支持作为性能调优选项的增量检查点。增量检查点仅存储自上次完成的检查点以来发生的更改。与执行完整快照相比,这大大减少了检查点时间。RocksDBStateBendback是当前唯一支持增量检查点的状态后端。

在以下情况下,RocksDB是一个不错的选择:

  • 作业的状态超出了本地内存的容量(例如,长时间的窗口、大的KeyedState);

  • 你正在研究增量检查点作为一种减少检查点时间的方法;

  • 希望有更可预测的延迟,而不受JVM垃圾回收的影响

否则,如果应用程序的状态很小或需要很低的延迟,则应该考虑FsStateBackend。根据经验,RocksDBStateBackend比基于堆的状态后端慢几倍,因为它将键值对存储为序列化的字节。这意味着任何状态访问(读/写)都需要经过一个跨JNI边界的反序列化/序列化过程,这比直接使用堆上的状态表示更昂贵。好处是,对于相同数量的状态,与相应的堆上表示法相比,它的内存占用率较低

如何使用RocksDBStateBackend

RocksDB完全嵌入到TaskManager进程中,并完全由TaskManager进程管理。RocksDBStateBackend可以在集群级别配置为整个集群的默认值,也可以在作业级别配置为单个作业的默认值。作业级配置优先于集群级配置。

集群级别

conf/flink-conf.yaml中添加以下配置:

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints

作业级别

创建StreamExecutionEnvironment后,将以下内容添加到作业的代码中:

# 'env' is the created StreamExecutionEnvironment
# 'true' is to enable incremental checkpointing
env.setStateBackend(new RocksDBStateBackend("hdfs:///fink-checkpoints", true));  

注意:除了HDFS之外,如果在FLINK_HOME/plugins下添加了相应的依赖项,那么还可以使用其他本地或基于云的对象存储。

最佳实践和高级配置

我们希望这个概述能帮助您更好地理解RocksDB在Flink中的作用,以及如何使用RocksDBStateBackend成功地运行作业。最后,我们将探讨一些最佳实践和一些参考点,以便进一步进行故障诊断和性能调优。

状态在RocksDB中的位置

如前所述,RocksDBStateBackend 中的运行中状态会溢出到磁盘上的文件。这些文件位于Flink配置指定的state.backend.rocksdb.localdir目录下。因为磁盘性能直接影响RocksDB的性能,所以建议将此目录放在本地磁盘上。不鼓励将其配置到基于网络的远程位置,如NFS或HDFS,因为写入远程磁盘通常比较慢。高可用性也不是飞行状态(in-flight state)的要求。如果需要高磁盘吞吐量,则首选本地SSD磁盘。

状态快照将持久化到远程持久存储。在状态快照期间,TaskManager会对飞行中的状态(in-flight state)进行快照并远程存储。将状态快照传输到远程存储完全由TaskManager本身处理,而不需要状态后端的参与。所以,state.checkpoints.dir 目录或者您在代码中为特定作业设置的参数可以是不同的位置,如本地HDFS集群或基于云的对象存储,如Amazon S3、Azure Blob Storage、Google cloud Storage、Alibaba OSS等。

RocksDB故障诊断

要检查RocksDB在生产中的行为,应该查找名为LOG的RocksDB日志文件。默认情况下,此日志文件与数据文件位于同一目录中,即Flink配置指定的目录state.backend.rocksdb.localdir。启用时,RocksDB统计信息也会记录在那里,以帮助诊断潜在的问题。有关更多信息,请查看RocksDB Wiki中的Troubleshooting Guide。如果你对RocksDB行为趋势感兴趣,可以考虑为你的Flink作业启用RocksDB本机指标。

注意:从Flink1.10开始,通过将日志级别设置为HEADER,RocksDB日志记录被有效地禁用。要启用它,请查看How to get RocksDB’s LOG file back for advanced troubleshooting。

警告:在Flink中启用RocksDB的原生指标可能会对您的工作产生负面影响。

从Flink 1.10开始,Flink默认将RocksDB的内存分配配置为每个任务槽的托管内存(managed memory)量。改善内存相关性能问题的主要机制是通过Flink配置taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction增加Flink的托管内存。对于更细粒度的控制,应该首先通过设置state.backend.rocksdb.memory.managed为false,然后从以下Flink配置开始:state.backend.rocksdb.block.cache-size(与RocksDB中的块大小相对应),state.backend.rocksdb.writebuffer.size(与RocksDB中的write_buffer_size相对应),以及state.backend.rocksdb.writebuffer.count(对应于RocksDB中的最大写入缓冲区数)。有关更多详细信息,请查看这篇关于如何在Flink中管理RocksDB内存大小的文章和RocksDB内存使用Wiki页面。

在RocksDB中写入或覆盖数据时,RocksDB线程在后台管理从内存到本地磁盘的刷新和数据压缩。在多核CPU的机器上,应该通过设置Flink配置state.backend.rocksdb.thread.num(对应于RocksDB中的max_background_jobs)来增加后台刷新和压缩的并行性。对于生产设置来说,默认配置通常太小。如果你的工作经常从RocksDB读取内容,那么应该考虑启用布隆过滤器。

对于其他RocksDBStateBackend配置,请查看Flink文档Advanced RocksDB State Backends Options。有关进一步的调优,请查看RocksDB Wiki中的RocksDB Tuning Guide。

RocksDB状态后端(即RocksDBStateBackend)是Flink中捆绑的三种状态后端之一,在配置流应用程序时是一个很好的选择。它使可扩展的应用程序能够保持高达数TB的状态,并保证exactly-once。如果Flink作业的状态太大,无法放入JVM堆中,或者你对增量检查点很感兴趣,或者希望有可预测的延迟,那么应该使用RocksDBStateBackend。由于RocksDB作为本机线程嵌入到TaskManager进程中,并且可以处理本地磁盘上的文件,RocksDBStateBackend支持开箱即用,无需更多设置和管理任何外部系统或进程。

关于如何在Apache Flink中使用RocksDB状态后端问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注蜗牛博客行业资讯频道了解更多相关知识。

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

评论

有免费节点资源,我们会通知你!加入纸飞机订阅群

×
天气预报查看日历分享网页手机扫码留言评论Telegram