State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink

19 May 2019 Fabian Hueske (@fhueske) & Andrey Zagrebin

A common requirement for many stateful streaming applications is to automatically cleanup application state for effective management of your state size, or to control how long the application state can be accessed (e.g. due to legal regulations like the GDPR). The state time-to-live (TTL) feature was initiated in Flink 1.6.0 and enabled application state cleanup and efficient state size management in Apache Flink.

In this post, we motivate the State TTL feature and discuss its use cases. Moreover, we show how to use and configure it. We explain how Flink internally manages state with TTL and present some exciting additions to the feature in Flink 1.8.0. The blog post concludes with an outlook on future improvements and extensions.

The Transient Nature of State

There are two major reasons why state should be maintained only for a limited time. For example, let’s imagine a Flink application that ingests a stream of user login events and stores for each user the time of the last login to improve the experience of frequent visitors.

  • Controlling the size of state. Being able to efficiently manage an ever-growing state size is a primary use case for state TTL. Oftentimes, data needs to be persisted temporarily while there is some user activity around it, e.g. web sessions. When the activity ends there is no longer interest in that data while it still occupies storage. Flink 1.8.0 introduces background cleanup of old state based on TTL that makes the eviction of no-longer-necessary data frictionless. Previously, the application developer had to take extra actions and explicitly remove useless state to free storage space. This manual clean up procedure was not only error prone but also less efficient than the new lazy method to remove state. Following our previous example of storing the time of the last login, this might not be necessary after some time because the user can be treated as “infrequent” later on.

  • Complying with data protection and sensitive data requirements. Recent developments around data privacy regulations, such as the General Data Protection Regulation (GDPR) introduced by the European Union, make compliance with such data requirements or treating sensitive data a top priority for many use cases and applications. An example of such use cases includes applications that require keeping data for a specific timeframe and preventing access to it thereafter. This is a common challenge for companies providing short-term services to their customers. The state TTL feature gives guarantees for how long an application can access state and hence can help to comply with data protection regulations.

Both requirements can be addressed by a feature that periodically, yet continuously, removes the state for a key once it becomes unnecessary or unimportant and there is no requirement to keep it in storage any more.

State TTL for continuous cleanup of application state

The 1.6.0 release of Apache Flink introduced the State TTL feature. It enabled developers of stream processing applications to configure the state of operators to expire and be cleaned up after a defined timeout (time-to-live). In Flink 1.8.0 the feature was extended, including continuous cleanup of old entries for both the RocksDB and the heap state backends (FSStateBackend and MemoryStateBackend), enabling a continuous cleanup process of old entries (according to the TTL setting).

In Flink’s DataStream API, application state is defined by a state descriptor. State TTL is configured by passing a StateTtlConfiguration object to a state descriptor. The following Java example shows how to create a state TTL configuration and provide it to the state descriptor that holds the last login time of a user as a Long value:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<Long> lastUserLogin = 
    new ValueStateDescriptor<>("lastUserLogin", Long.class);

lastUserLogin.enableTimeToLive(ttlConfig);

Flink provides multiple options to configure the behavior of the state TTL functionality.

  • When is the Time-to-Live reset? By default, the expiration time of a state entry is updated when the state is modified. Optionally, it can also be updated on read access at the cost of an additional write operation to update the timestamp.

  • Can the expired state be accessed one last time? State TTL employs a lazy strategy to clean up expired state. This can lead to the situation that an application attempts to read state which is expired but hasn’t been removed yet. You can configure whether such a read request returns the expired state or not. In either case, the expired state is immediately removed afterwards. While the option of returning expired state favors data availability, not returning expired state can be required for data protection regulations.

  • Which time semantics are used for the Time-to-Live timers? With Flink 1.8.0, users can only define a state TTL in terms of processing time. The support for event time is planned for future Apache Flink releases.

You can read more about how to use state TTL in the Apache Flink documentation.

Internally, the State TTL feature is implemented by storing an additional timestamp of the last relevant state access, along with the actual state value. While this approach adds some storage overhead, it allows Flink to check for the expired state during state access, checkpointing, recovery, or dedicated storage cleanup procedures.

“Taking out the Garbage”

When a state object is accessed in a read operation, Flink will check its timestamp and clear the state if it is expired (depending on the configured state visibility, the expired state is returned or not). Due to this lazy removal, expired state that is never accessed again will forever occupy storage space unless it is garbage collected.

So how can the expired state be removed without the application logic explicitly taking care of it? In general, there are different possible strategies to remove it in the background.

Keep full state snapshots clean

Flink 1.6.0 already supported automatic eviction of the expired state when a full snapshot for a checkpoint or savepoint is taken. Note that state eviction is not applied for incremental checkpoints. State eviction on full snapshots must be explicitly enabled as shown in the following example:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupFullSnapshot()
    .build();

The local storage stays untouched but the size of the stored snapshot is reduced. The local state of an operator will only be cleaned up when the operator reloads its state from a snapshot, i.e. in case of recovery or when starting from a savepoint.

Due to these limitations, applications still need to actively remove state after it expired in Flink 1.6.0. To improve the user experience, Flink 1.8.0 introduces two more autonomous cleanup strategies, one for each of Flink’s two state backend types. We describe them below.

Incremental cleanup in Heap state backends

This approach is specific to the Heap state backends (FSStateBackend and MemoryStateBackend). The idea is that the storage backend keeps a lazy global iterator over all state entries. Certain events, for instance state access, trigger an incremental cleanup. Every time an incremental cleanup is triggered, the iterator is advanced. The traversed state entries are checked and expired once are removed. The following code example shows how to enable incremental cleanup:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    // check 10 keys for every state access
    .cleanupIncrementally(10, false)
    .build();

If enabled, every state access triggers a cleanup step. For every clean up step, a certain number of state entries are checked for expiration. There are two tuning parameters. The first defines the number of state entries to check for each cleanup step. The second parameter is a flag to trigger a cleanup step after each processed record, additionally to each state access.

There are two important caveats about this approach: * The first one is that the time spent for the incremental cleanup increases the record processing latency. * The second one should be practically negligible but still worth mentioning: if no state is accessed or no records are processed, expired state won’t be removed.

RocksDB background compaction to filter out expired state

If your application uses the RocksDB state backend, you can enable another cleanup strategy which is based on a Flink specific compaction filter. RocksDB periodically runs asynchronous compactions to merge state updates and reduce storage. The Flink compaction filter checks the expiration timestamp of state entries with TTL and discards all expired values.

The first step to activate this feature is to configure the RocksDB state backend by setting the following Flink configuration option: state.backend.rocksdb.ttl.compaction.filter.enabled. Once the RocksDB state backend is configured, the compaction cleanup strategy is enabled for a state as shown in the following code example:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupInRocksdbCompactFilter()
    .build();

Keep in mind that calling the Flink TTL filter slows down the RocksDB compaction.

Eager State Cleanup with Timers

Another way to manually cleanup state is based on Flink timers. This is an idea that the community is currently evaluating for future releases. With this approach, a cleanup timer is registered for every state access. This approach is more predictable because state is eagerly removed as soon as it expires. However, it is more expensive because the timers consume storage along with the original state.

Future work

Apart from including the timer-based cleanup strategy, mentioned above, the Flink community has plans to further improve the state TTL feature. The possible improvements include adding support of TTL for event time scale (only processing time is supported at the moment) and enabling State TTL for queryable state.

We encourage you to join the conversation and share your thoughts and ideas in the Apache Flink JIRA board or by subscribing to the Apache Flink dev mailing list. Feedback or suggestions are always appreciated and we look forward to hearing your thoughts on the Flink mailing lists.

Summary

Time-based state access restrictions and controlling the size of application state are common challenges in the world of stateful stream processing. Flink’s 1.8.0 release significantly improves the State TTL feature by adding support for continuous background cleanup of expired state objects. The new clean up mechanisms relieve you from manually implementing state cleanup. They are also more efficient due to their lazy nature. State TTL gives you control over the size of your application state so that you can focus on the core logic of your applications.