Memory Management improvements for Flink’s JobManager in Apache Flink 1.11

September 1, 2020 - Andrey Zagrebin

Apache Flink 1.11 comes with significant changes to the memory model of Flink’s JobManager and configuration options for your Flink clusters. These recently-introduced changes make Flink adaptable to all kinds of deployment environments (e.g. Kubernetes, Yarn, Mesos), providing better control over its memory consumption.

The previous blog post focused on the memory model of the TaskManagers and how it was improved in Flink 1.10. This post addresses the same topic but for the JobManager instead. Flink 1.11 unifies the memory model of Flink’s processes. The newly-introduced memory model of the JobManager follows a similar approach to that of the TaskManagers; it is simpler and has fewer components and tuning knobs. This post might consequently seem very similar to our previous story on Flink’s memory but aims at providing a complete overview of Flink’s JobManager memory model as of Flink 1.11. Read on for a full list of updates and changes below!

Having a clear understanding of Apache Flink’s process memory model allows you to manage resources for the various workloads more efficiently. The following diagram illustrates the main memory components of a Flink process:

Backpressure sampling:high
Flink: Total Process Memory

The JobManager process is a JVM process. On a high level, its memory consists of the JVM Heap and Off-Heap memory. These types of memory are consumed by Flink directly or by the JVM for its specific purposes (i.e. metaspace). There are two major memory consumers within the JobManager process: the framework itself consuming memory for internal data structures, network communication, etc. and the user code which runs within the JobManager process, e.g. in certain batch sources or during checkpoint completion callbacks.

Note Please note that the user code has direct access to all memory types: JVM Heap, Direct and Native memory. Therefore, Flink cannot really control its allocation and usage.

How to set up JobManager memory #

With the release of Flink 1.11 and in order to provide better user experience, the Flink community introduced three alternatives to setting up memory in JobManagers.

The first two — and simplest — alternatives are configuring one of the two following options for total memory available for the JVM process of the JobManager:

  • Total Process Memory: total memory consumed by the Flink Java application (including user code) and by the JVM to run the whole process.
  • Total Flink Memory: only the memory consumed by the Flink Java application, including user code but excluding any memory allocated by the JVM to run it.

It is advisable to configure the Total Flink Memory for standalone deployments where explicitly declaring how much memory is given to Flink is a common practice, while the outer JVM overhead is of little interest. For the cases of deploying Flink in containerized environments (such as Kubernetes, Yarn or Mesos), the Total Process Memory option is recommended instead, because it becomes the size for the total memory of the requested container. Containerized environments usually strictly enforce this memory limit.

If you want more fine-grained control over the size of the JVM Heap, there is also the third alternative of configuring it directly. This alternative gives a clear separation between the heap memory and any other memory types.

The remaining memory components will be automatically adjusted either based on their default values or additionally-configured parameters. Apache Flink also checks the overall consistency. You can find more information about the different memory components in the corresponding documentation. You can try different configuration options with the configuration spreadsheet (you have to make a copy of the spreadsheet to edit it) of FLIP-116 and check the corresponding results for your individual case.

If you are migrating from a Flink version older than 1.11, we suggest following the steps in the migration guide of the Flink documentation.

Additionally, you can configure separately the Off-heap memory (JVM direct and non-direct memory) as well as the JVM metaspace & overhead. The JVM overhead is a fraction of the Total Process Memory. The JVM overhead can be configured in a similar way as other fractions described in our previous blog post about the TaskManager’s memory model.

More hints to control the container memory limit #

The heap and direct memory usage are managed by the JVM. There are also many other possible sources of native memory consumption in Apache Flink or its user applications which are not managed directly by Flink or the JVM. Controlling their limits is often difficult which complicates debugging of potential memory leaks. If Flink’s process allocates too much memory in an unmanaged way, it can often result in killing its containers for containerized environments. In this case, understanding which type of memory consumption has exceeded its limit might be difficult to grasp and resolve. Flink 1.11 introduces some specific tuning options to clearly represent such components for the JobManager’s process. Although Flink cannot always enforce strict limits and borders among them, the idea here is to explicitly plan the memory usage. Below we provide some examples of how memory setup can prevent containers from exceeding their memory limit:

  • User code or its dependencies consume significant off-heap memory. Tuning the Off-heap option can assign additional direct or native memory to the user code or any of its dependencies. Flink cannot control native allocations but it sets the limit for JVM Direct memory allocations. The Direct memory limit is enforced by the JVM.

  • JVM metaspace requires additional memory. If you encounter OutOfMemoryError: Metaspace, Flink provides an option to increase its default limit and the JVM will ensure that it is not exceeded. The metaspace size of a Flink JVM process is always explicitly set in contrast to the default JVM settings where it is not limited.

  • JVM requires more internal memory. There is no direct control over certain types of JVM process allocations but Flink provides JVM Overhead options. The JVM Overhead options allow declaring an additional amount of memory, anticipated for those allocations and not covered by other options.

Conclusion #

The latest Flink release (Flink 1.11) introduces some notable changes to the memory configuration of Flink’s JobManager, making its memory management significantly easier than before. Stay tuned for more additions and features in upcoming releases. If you have any suggestions or questions for the Flink community, we encourage you to sign up to the Apache Flink mailing lists and become part of the discussion.