The Apache Flink community is pleased to announce Apache Flink 1.8.0. The latest release includes more than 420 resolved issues and some exciting additions to Flink that we describe in the following sections of this post. Please check the complete changelog for more details.
Flink 1.8.0 is API-compatible with previous 1.x.y releases for APIs annotated
@Public annotation. The release is available now and we encourage
everyone to download the release and
check out the updated
Feedback through the Flink mailing
JIRA is, as always,
very much appreciated!
You can find the binaries on the updated Downloads page on the Flink project site.
With Flink 1.8.0 we come closer to our goals of enabling fast data processing and building data-intensive applications for the Flink community in a seamless way. We do this by cleaning up and refactoring Flink under the hood to allow more efficient feature development in the future. This includes removal of the legacy runtime components that were subsumed in the major rework of Flink’s underlying distributed system architecture (FLIP-6) as well as refactorings on the Table API that prepare it for the future addition of the Blink enhancements (FLINK-11439).
Nevertheless, this release includes some important new features and bug fixes. The most interesting of those are highlighted below. Please consult the complete changelog and the release notes for more details.
New Features and Improvements #
Finalized State Schema Evolution Story: This release completes the community driven effort to provide a schema evolution story for user state managed by Flink. This has been an effort that spanned 2 releases, starting from 1.7.0 with the introduction of support for Avro state schema evolution as well as a revamped serialization compatibility abstraction.
Flink 1.8.0 finalizes this effort by extending support for schema evolution to POJOs, upgrading all Flink built-in serializers to use the new serialization compatibility abstractions, as well as making it easier for advanced users who use custom state serializers to implement the abstractions. These different aspects for a complete out-of-the-box schema evolution story are explained in detail below:
Support for POJO state schema evolution: The pool of data types that support state schema evolution has been expanded to include POJOs. For state types that use POJOs, you can now add or remove fields from your POJO while retaining backwards compatibility. For a full overview of the list of data types that now support schema evolution as well as their evolution specifications and limitations, please refer to the State Schema Evolution documentation page.
Upgrade all Flink serializers to use new serialization compatibility asbtractions: Back in 1.7.0, we introduced the new serialization compatibility abstractions
TypeSerializerSchemaCompatibility. Besides providing a more expressible API to reflect schema compatibility between the data stored in savepoints and the data registered at runtime, another important aspect about the new abstraction is that it avoids the need for Flink to Java-serialize the state serializer as state metadata in savepoints.
In 1.8.0, all of Flink’s built-in serializers have been upgraded to use the new abstractions, and therefore the serializers themselves are no longer Java-serialized into savepoints. This greatly improves interoperability of Flink savepoints, in terms of state schema evolvability. For example, one outcome was the support for POJO schema evolution, as previously mentioned above. Another outcome is that all composite data types supported by Flink (such as
Either, Scala case classes, Flink Java
Tuples, etc.) are generally evolve-able as well when they have a nested evolvable type, such as a POJO. For example, the
ListState<Either<Integer, MyPojo>>, which is a POJO, is allowed to evolve its schema.
For users who are using custom
TypeSerializerimplementations for their state serializer and are still using the outdated abstractions (i.e.
CompatiblityResult), we highly recommend upgrading to the new abstractions to be future proof. Please refer to the Custom State Serialization documentation page for a detailed description on the new abstractions.
Provide pre-defined snapshot implementations for common serializers: For convenience, Flink 1.8.0 comes with two predefined implementations for the
TypeSerializerSnapshotthat make the task of implementing these new abstractions easier for most implementations of
CompositeTypeSerializerSnapshot. This section in the documentation provides information on how to use these classes.
Continuous cleanup of old state based on TTL (FLINK-7811): We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510). This feature enabled cleanup and made keyed state entries inaccessible after a defined timeout. In addition state would now also be cleaned up when writing a savepoint/checkpoint.
Flink 1.8 introduces continuous cleanup of old entries for both the RocksDB state backend (FLINK-10471) and the heap state backend (FLINK-10473). This means that old entries (according to the TTL setting) are continuously cleaned up.
SQL pattern detection with user-defined functions and aggregations: The support of the MATCH_RECOGNIZE clause has been extended by multiple features. The addition of user-defined functions allows for custom logic during pattern detection (FLINK-10597), while adding aggregations allows for more complex CEP definitions, such as the following (FLINK-7599).
SELECT * FROM Ticker MATCH_RECOGNIZE ( ORDER BY rowtime MEASURES AVG(A.price) AS avgPrice ONE ROW PER MATCH AFTER MATCH SKIP TO FIRST B PATTERN (A+ B) DEFINE A AS AVG(A.price) < 15 ) MR;
RFC-compliant CSV format (FLINK-9964): The SQL tables can now be read and written in an RFC-4180 standard compliant CSV table format. The format might also be useful for general DataStream API users.
New KafkaDeserializationSchema that gives direct access to ConsumerRecord (FLINK-8354): For the Flink
KafkaConsumers, we introduced a new
KafkaDeserializationSchemathat gives direct access to the Kafka
ConsumerRecord. This now allows access to all data that Kafka provides for a record, including the headers. This subsumes the
KeyedSerializationSchemafunctionality, which is deprecated but still available for now.
Per-shard watermarking option in FlinkKinesisConsumer (FLINK-5697): The Kinesis Consumer can now emit periodic watermarks that are derived from per-shard watermarks, for correct event time processing with subtasks that consume multiple Kinesis shards.
New consumer for DynamoDB Streams to capture table changes (FLINK-4582):
FlinkDynamoDBStreamsConsumeris a variant of the Kinesis consumer that supports retrieval of CDC-like streams from DynamoDB tables.
Support for global aggregates for subtask coordination (FLINK-10887): Designed as a solution for global source watermark tracking,
GlobalAggregateManagerallows sharing of information between parallel subtasks. This feature will be integrated into streaming connectors for watermark synchronization and can be used for other purposes with a user defined aggregator.
Important Changes #
Changes to bundling of Hadoop libraries with Flink (FLINK-11266): Convenience binaries that include hadoop are no longer released.
If a deployment relies on
flink-shaded-hadoop2being included in
flink-dist, then you must manually download a pre-packaged Hadoop jar from the optional components section of the download page and copy it into the
/libdirectory. Alternatively, a Flink distribution that includes hadoop can be built by packaging
flink-distand activating the
As hadoop is no longer included in
flink-distby default, specifying
flink-distno longer impacts the build.
FlinkKafkaConsumer will now filter restored partitions based on topic specification (FLINK-10342): Starting from Flink 1.8.0, the
FlinkKafkaConsumernow always filters out restored partitions that are no longer associated with a specified topic to subscribe to in the restored execution. This behaviour did not exist in previous versions of the
FlinkKafkaConsumer. If you wish to retain the previous behaviour, please use the
disableFilterRestoredPartitionsWithSubscribedTopics()configuration method on the
Consider this example: if you had a Kafka Consumer that was consuming from topic
A, you did a savepoint, then changed your Kafka consumer to instead consume from topic
B, and then restarted your job from the savepoint. Before this change, your consumer would now consume from both topic
Bbecause it was stored in state that the consumer was consuming from topic
A. With the change, your consumer would only consume from topic
Bafter restore because it now filters the topics that are stored in state using the configured topics.
Change in the Maven modules of Table API (FLINK-11064): Users that had a
flink-tabledependency before, need to update their dependencies to
flink-table-plannerand the correct dependency of
flink-table-api-*, depending on whether Java or Scala is used: one of
Known Issues #
- Discarded checkpoint can cause Tasks to fail (FLINK-11662): There is a race condition that can lead to erroneous checkpoint failures. This mostly occurs when restarting from a savepoint or checkpoint takes a long time at the sources of a job. If you see random checkpointing failures that don’t seem to have a good explanation you might be affected. Please see the Jira issue for more details and a workaround for the problem.
Release Notes #
Please review the release notes for a more detailed list of changes and new features if you plan to upgrade your Flink setup to Flink 1.8.
List of Contributors #
We would like to acknowledge all community members for contributing to this
release. Special credits go to the following members for contributing to the
1.8.0 release (according to
git log --pretty="%an" release-1.7.0..release-1.8.0 | sort | uniq without manual deduplication):
Addison Higham, Aitozi, Aleksey Pak, Alexander Fedulov, Alexey Trenikhin, Aljoscha Krettek, Andrey Zagrebin, Artsem Semianenka, Asura7969, Avi, Barisa Obradovic, Benchao Li, Bo WANG, Chesnay Schepler, Congxian Qiu, Cristian, David Anderson, Dawid Wysakowicz, Dian Fu, DuBin, EAlexRojas, EronWright, Eugen Yushin, Fabian Hueske, Fokko Driesprong, Gary Yao, Hequn Cheng, Igal Shilman, Jamie Grier, JaryZhen, Jeff Zhang, Jihyun Cho, Jinhu Wu, Joerg Schad, KarmaGYZ, Kezhu Wang, Konstantin Knauf, Kostas Kloudas, Lakshmi, Lakshmi Gururaja Rao, Lavkesh Lahngir, Li, Shuangjiang, Mai Nakagawa, Matrix42, Matt, Maximilian Michels, Mododo, Nico Kruber, Paul Lin, Piotr Nowojski, Qi Yu, Qin, Robert, Robert Metzger, Romano Vacca, Rong Rong, Rune Skou Larsen, Seth Wiesman, Shannon Carey, Shimin Yang, Shuyi Chen, Stefan Richter, Stephan Ewen, SuXingLee, TANG Wen-hui, Tao Yang, Thomas Weise, Till Rohrmann, Timo Walther, Tom Goong, Tony Feng, Tony Wei, Tzu-Li (Gordon) Tai, Tzu-Li Chen, Ufuk Celebi, Xingcan Cui, Xpray, XuQianJin-Stars, Xue Yu, Yangze Guo, Ying Xu, Yiqun Lin, Yu Li, Yuanyang Wu, Yun Tang, ZILI CHEN, Zhanchun Zhang, Zhijiang, ZiLi Chen, acqua.csq, alex04.wang, ap, azagrebin, blueszheng, boshu Zheng, chengjie.wu, chensq, chummyhe89, eaglewatcherwb, hequn8128, ifndef-SleePy, intsmaze, jackyyin, jinhu.wjh, jparkie, jrthe42, junsheng.wu, kgorman, kkloudas, kkolman, klion26, lamber-ken, leesf, libenchao, lining, liuzhaokun, lzh3636, maqingxiang, mb-datadome, okidogi, park.yq, sunhaibotb, sunjincheng121, tison, unknown, vinoyang, wenhuitang, wind, xueyu, xuqianjin, yanghua, zentol, zhangzhanchun, zhijiang, zhuzhu.zz, zy, 仲炜, 砚田, 谢磊