Announcing the Release of Apache Flink 1.17
March 23, 2023 - Leonard Xu (@Leonardxbj)The Apache Flink PMC is pleased to announce Apache Flink release 1.17.0. Apache Flink is the leading stream processing standard, and the concept of unified stream and batch data processing is being successfully adopted in more and more companies. Thanks to our excellent community and contributors, Apache Flink continues to grow as a technology and remains one of the most active projects in the Apache Software Foundation. Flink 1.17 had 172 contributors enthusiastically participating and saw the completion of 7 FLIPs and 600+ issues, bringing many exciting new features and improvements to the community.
Towards Streaming Warehouses #
In order to achieve greater efficiency in the realm of streaming warehouse, Flink 1.17 contains substantial improvements to both the performance of batch processing and the semantics of streaming processing. These improvements represent a significant stride towards the creation of a more efficient and streamlined data warehouse, capable of processing large quantities of data in real-time.
For batch processing, this release includes several new features and improvements:
- Streaming Warehouse API: FLIP-282 introduces the new Delete and Update API in Flink SQL which only works in batch mode. External storage systems like Flink Table Store can implement row-level modification via this new API. The ALTER TABLE syntax is enhanced by including the ability to ADD/MODIFY/DROP columns, primary keys, and watermarks, making it easier for users to maintain their table schema.
- Batch Execution Improvements: Execution of batch workloads has been significantly improved in Flink 1.17 in terms of performance, stability and usability. Performance wise, a 26% TPC-DS improvement on 10T dataset is achieved with strategy and operator optimizations, such as new join reordering and adaptive local hash aggregation, Hive aggregate functions improvements, and the Hybrid Shuffle Mode enhancements. Stability wise, Speculative Execution now supports all operators, and the Adaptive Batch Scheduler is more robust against data skew. Usability wise, the tuning effort required for batch workloads has been reduced. The is now the default scheduler in batch mode. The Hybrid Shuffle Mode is compatible with Speculative Execution and the Adaptive Batch Scheduler, next to various configuration simplifications.
- SQL Client/Gateway: Apache Flink 1.17 introduces the “gateway mode” for SQL Client, allowing users to submit SQL queries to a SQL Gateway for enhanced functionality. Users can use SQL statements to manage job lifecycles, including displaying job information and stopping running jobs. This provides a powerful tool for managing Flink jobs.
For stream processing, the following features and improvements are realized:
- Streaming SQL Semantics: Non-deterministic operations may bring incorrect results or exceptions which is a challenging topic in streaming SQL. Incorrect optimization plans and functional issues have been fixed, and the experimental feature of PLAN_ADVICE is introduced to inform of potential correctness risks and optimization suggestions to SQL users.
- Checkpoint Improvements: The generic incremental checkpoint improvements enhance the speed and stability of the checkpoint procedure, and the unaligned checkpoint has improved stability under backpressure and is production-ready in Flink 1.17. Users can manually trigger checkpoints with self-defined checkpoint types while a job is running with the newly introduced REST interface for triggering checkpoints.
- Watermark Alignment Enhancement: Efficient watermark processing directly affects the execution efficiency of event time applications. In Flink 1.17, FLIP-217 introduces an improvement to watermark alignment by aligning data emission across splits within a source operator. This improvement results in more efficient coordination of watermark progress in the source, which in turn mitigates excessive buffering by downstream operators and enhances the overall efficiency of steaming job execution.
- StateBackend Upgrade: The updated version of FRocksDB to 6.20.3-ververica-2.0 brings improvements to RocksDBStateBackend like sharing memory between slots, and now supports Apple Silicon chipsets like the Mac M1.
Batch processing #
As a unified stream and batch data processing engine, Flink stands out particularly in the field of stream processing. In order to improve its batch processing capabilities, the community contributors put in a lot of effort into improving Flink’s batch performance and ecosystem in version 1.17. This makes it easier for users to build a streaming warehouse based on Flink.
Speculative Execution #
Speculative Execution for sinks is now supported. Previously, Speculative Execution was not enabled for sinks to avoid instability or incorrect results. In Flink 1.17, the context of sinks is improved so that sinks, including new sinks and OutputFormat sinks, are aware of the number of attempts. With the number of attempts, sinks are able to isolate the produced data of different attempts of the same subtask, even if the attempts are running at the same time. The FinalizeOnMaster interface is also improved so that OutputFormat sinks can see which attempts are finished and then properly commit the written data. Once a sink can work well with concurrent attempts, it can implement the decorative interface SupportsConcurrentExecutionAttempts so that Speculative Execution is allowed to be performed on it. Some built in sinks are enabled to do Speculative Execution, including DiscardingSink, PrintSinkFunction, PrintSink, FileSink, FileSystemOutputFormat and HiveTableSink.
The slow task detection is improved for Speculative Execution. Previously, it only considered the execution time of tasks when deciding which tasks are slow. It now takes the input data volume of tasks into account. Tasks which have a longer execution time but consume more data may not be considered as slow. This improvement helps to eliminate the negative impacts from data skew on slow task detection.
Adaptive Batch Scheduler #
Adaptive Batch Scheduler is now used for batch jobs by default. This scheduler can automatically decide a proper parallelism of each job vertex, based on how much data the vertex processes. It is also the only scheduler which supports speculative execution.
The configuration of Adaptive Batch Scheduler is improved for ease of use. Users no longer need to explicitly set the global default parallelism to -1 to enable automatically deciding parallelism. Instead, the global default parallelism, if set, will be used as the upper bound when deciding the parallelism. The keys of Adaptive Batch Scheduler configuration options are also renamed to be easier to understand.
The capabilities of Adaptive Batch Scheduler are also improved. It now supports evenly distributing data to downstream tasks, based on fine-grained data distribution information. The limitation that the decided parallelism of vertices can only be a power of 2 is no longer needed and therefore removed.
Hybrid Shuffle Mode #
Various important improvements to the Hybrid Shuffle Mode are available in this release.
- Hybrid Shuffle Mode now supports Adaptive Batch Scheduler and Speculative Execution.
- Hybrid Shuffle Mode now supports reusing intermediate data when possible, which brings significant performance improvements.
- The stability is improved to avoid stability issues in large scale production.
More details can be found at the Hybrid-Shuffle section of the documentation.
TPC-DS Benchmark #
Starting with Flink 1.16, the performance of the Batch engine has continuously been optimized. In Flink 1.16, dynamic partition pruning was introduced, but not all TPC-DS queries could be optimized. In Flink 1.17, the algorithm has been improved, and most of the TPC-DS results are now optimized. In Flink 1.17, a dynamic programming join-reorder algorithm is introduced, which has a better working, larger search space compared to the previous algorithm.. The planner can automatically select the appropriate join-reorder algorithm based on the number of joins in a query, so that users no longer need to care about the join-reorder algorithms. (Note: the join-reorder is disabled by default, and you need to enable it when running TPC-DS.) In the operator layer, dynamic hash local aggregation strategy is introduced, which can dynamically determine according to the data distribution whether the local hash aggregation operation is needed to improve performance. In the runtime layer, some unnecessary virtual function calls are removed to speed up the execution. To summarize, Flink 1.17 has a 26% performance improvement compared to Flink 1.16 on a 10T dataset for partitioned tables.
SQL Client / Gateway #
Apache Flink 1.17 introduces a new feature called “gateway mode” for the SQL Client, which enhances its functionality by allowing it to connect to a remote gateway and submit SQL queries like it does in embedded mode. This new mode offers users much more convenience when working with the SQL Gateway.
In addition, the SQL Client/SQL Gateway now provides new support for managing job lifecycles through SQL statements. Users can use SQL statements to display all job information stored in the JobManager and stop running jobs using their unique job IDs. With this new feature, SQL Client/Gateway now has almost the same functionality as Flink CLI, making it another powerful tool for managing Flink jobs.
SQL API #
Row-Level SQL Delete & Update are becoming more and more important in modern big data workflows. The use cases include deleting a set of rows for regulatory compliance, updating a set of rows for data correction, etc. Many popular engines such as Trino or Hive have supported them. In Flink 1.17, the new Delete & Update API is introduced in Flink, which works in batch mode and is exposed to connectors. Now external storage systems can implement row-level modification via this new API. Furthermore, the ALTER TABLE syntax is extended to include the ability to ADD/MODIFY/DROP columns, primary keys, and watermarks. These enhanced capabilities provide users with the flexibility to maintain their table schema metadata according to their needs.
Hive Compatibility #
Apache Flink 1.17 brings new improvements to the Hive table sink, making it more efficient than ever before. In previous versions, the Hive table sink only supported automatic file compaction in streaming mode, but not in batch mode. In Flink 1.17, the Hive table sink can now automatically compact newly written files in batch mode as well. This feature can greatly reduce the number of small files. Also, for using Hive built-in functions via HiveModule, Flink introduces several native Hive aggregation functions including SUM/COUNT/AVG/MIN/MAX to HiveModule. These functions can be executed using the hash-based aggregation operator and then bring significant performance improvements.
Streaming Processing #
In Flink 1.17, difficult Streaming SQL semantics and correctness issues are addressed, checkpoint performance is optimized, watermark alignment is enhanced, Streaming FileSink expands ABFS(Azure Blob Filesystem) support, Calcite and FRocksDB have been upgraded to newer versions. These improvements further enhance the capabilities of Flink in the field of stream processing.
Streaming SQL Semantics #
In terms of correctness and semantic enhancement, Flink 1.17 introduces the experimental feature PLAN_ADVICE that detects potential correctness risks and provides optimization suggestions. For example, if a NDU (Non-deterministic Updates) issue is detected by EXPLAIN PLAN_ADVICE, the optimizer will append the advice at the end of the physical plan, the optimizer will then tag the advice id to the relational node of related operations, and recommend that users update their configurations accordingly. By providing users with this specific advice, the optimizer can help them improve the accuracy and reliability of their query results.
== Optimized Physical Plan With Advice ==
...
advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
The PLAN_ADVICE also helps users improve the performance and efficiency of their queries. For example, when a GroupAggregate operation is detected and can be optimized to a more efficient local-global aggregation. By providing users with this specific advice for optimization, the optimizer enables users to easily improve the performance and efficiency of their queries.
== Optimized Physical Plan With Advice ==
...
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
In addition, Flink 1.17 resolved several incorrect plan optimizations which led to incorrect results reported in FLINK-29849, FLINK-30006, and FLINK-30841.
Checkpoint Improvements #
Generic Incremental Checkpoint (GIC) aims to improve the speed and stability of the checkpoint procedure. Some experimental results in the WordCount case are shown as below. More details can be found in this blog post.
Table1: Benefits after enabling GIC in WordCount case
Table2: Costs after enabling GIC in WordCount case
Unaligned Checkpoints (UC) greatly increase the completed ratio of checkpoints under backpressure. The previous UC implementation would write many small files which may cause high load for the namenode of HDFS. In this release, this problem is resolved to make UC more usable in the production environment.
In 1.17, a REST API is provided so that users can manually trigger checkpoints with a self-defined checkpoint type while a job is running. For example, for a job running with incremental checkpoint, users can trigger a full checkpoint periodically or manually to break the incremental checkpoint chain to avoid referring to files from a long time ago.
Watermark Alignment Support #
In earlier versions, FLIP-182 proposed a solution called watermark alignment to tackle the issue of data skew in event time applications caused by imbalanced sources. However, it had a limitation that the source parallelism had to match the number of splits. This was because a source operator with multiple splits might need to buffer a considerable amount of data if one split emitted data faster than another. To address this limitation, Flink 1.17 introduced FLIP-217, which enhances watermark alignment to align data emission across splits within a source operator while considering watermark boundaries. This enhancement ensures more coordinated watermark progress in the source, preventing downstream operators from buffering excessive data and improving the execution efficiency of streaming jobs.
Streaming FileSink Expansion #
Following the addition of ABFS support, the FileSink is now able to function in streaming mode with a total of five different filesystems: HDFS, S3, OSS, ABFS, and Local. This expansion effectively covers the majority of main filesystems, providing a comprehensive range of options and increased versatility for users.
RocksDBStateBackend Upgrade #
This release has updated the version of FRocksDB to 6.20.3-ververica-2.0 which brings improvements for RocksDBStateBackend:
- Support build FRocksDB Java on Apple Silicon chipsets, such as Mac M1 and M2.
- Improve the performance of compaction filter by avoiding expensive ToString()
- Upgrade ZLIB version of FRocksDB to avoid memory corruption
- Add periodic_compaction_seconds option to RocksJava
Please see FLINK-30836 for more details.
This release also widens the scope of sharing memory between slots to TaskManager, which can help to increase the memory efficiency if the memory usage of slots in a TaskManager is uneven. Furthermore, it can reduce the overall memory consumption at the expense of resource isolation after tuning. Read more about state.backend.rocksdb.memory.fixed-per-tm configuration.
Calcite Upgrade #
Flink 1.17 is upgraded to Calcite version 1.29.0 to improve the performance and efficiency of the Flink SQL system. Flink 1.16 uses Calcite 1.26.0 which has severe issues with RexNode simplification caused by the SEARCH operator. This leads to wrong data from query optimization as reported in CALCITE-4325 and CALCITE-4352. By upgrading the version of Calcite, Flink can take advantage of its improved performance and new features in Flink SQL processing. This resolves multiple bugs and leads to faster query processing times.
Others #
PyFlink #
The Flink 1.17 release includes updates to PyFlink, the Python interface for Apache Flink. Notable improvements include support for Python 3.10 and execution capabilities on Apple Silicon chipsets, such as the Mac M1 and M2 computers. Additionally, the release includes minor optimizations that enhance cross-process communication stability between Java and Python processes, enable the specification of data types of Python UDFs via strings to improve usability, and support access to job parameters in Python UDFs. This release focuses on improving PyFlink’s functionality and usability, rather than introducing new major features. However, these enhancements are expected to improve the user experience and facilitate efficient data processing.
Daily Performance Benchmark #
In Flink 1.17, daily performance monitoring has been integrated into the #flink-dev-benchmarks Slack channel. This feature is crucial in quickly identifying regressions and ensuring the quality of the code. Once a regression is identified through the Slack channel or the speed center, developers can refer to the guidance provided in the Benchmark’s wiki to address the issue effectively. This feature helps the community take a proactive approach in ensuring system performance, resulting in a better product and increased user satisfaction.
Subtask Level Flame Graph #
Starting with Flink 1.17, Flame Graph provides “drill down” visualizations to the task level, which allows users to gain a more detailed understanding of the performance of their tasks. This feature is a significant improvement over previous versions of Flame Graph, as it empowers users to select a subtask of interest and see the corresponding flame graph. By doing so, users can identify specific areas where their tasks may be experiencing performance issues and take steps to address them. This can lead to significant improvements in the overall efficiency and effectiveness of their data processing pipelines.
Generalized Delegation Token Support #
Previously, Flink supported Kerberos authentication and Hadoop based tokens. With FLIP-272 being finalized, Flink’s delegation token framework is generalized to make it authentication protocol agnostic. This will allow contributors in the future to add support for non-Hadoop compliant frameworks where the authentication protocol is not based on Kerberos. Additionally, FLIP-211 was implemented which improves Flink’s interactions with Kerberos: It reduces the number of requests that are necessary to exchange delegation tokens in Flink.
Upgrade Notes #
The Flink community tries to ensure that upgrades are as seamless as possible. However, certain changes may require users to make adjustments to certain parts of the program when upgrading to version 1.17. Please refer to the release notes for a comprehensive list of adjustments to make and issues to check during the upgrading process.
List of Contributors #
The Apache Flink community would like to express gratitude to all the contributors who made this release possible:
Ahmed Hamdy, Aitozi, Aleksandr Pilipenko, Alexander Fedulov, Alexander Preuß, Anton Kalashnikov, Arvid Heise, Bo Cui, Brayno, Carlos Castro, ChangZhuo Chen (陳昌倬), Chen Qin, Chesnay Schepler, Clemens, ConradJam, Danny Cranmer, Dawid Wysakowicz, Dian Fu, Dong Lin, Dongjoon Hyun, Elphas Toringepi, Eric Xiao, Fabian Paul, Ferenc Csaky, Gabor Somogyi, Gen Luo, Gunnar Morling, Gyula Fora, Hangxiang Yu, Hong Liang Teoh, HuangXingBo, Jacky Lau, Jane Chan, Jark Wu, Jiale, Jin, Jing Ge, Jinzhong Li, Joao Boto, John Roesler, Jun He, JunRuiLee, Junrui Lee, Juntao Hu, Krzysztof Chmielewski, Leonard Xu, Licho, Lijie Wang, Mark Canlas, Martijn Visser, MartijnVisser, Martin Liu, Marton Balassi, Mason Chen, Matt, Matthias Pohl, Maximilian Michels, Mingliang Liu, Mulavar, Nico Kruber, Noah, Paul Lin, Peter Huang, Piotr Nowojski, Qing Lim, QingWei, Qingsheng Ren, Rakesh, Ran Tao, Robert Metzger, Roc Marshal, Roman Khachatryan, Ron, Rui Fan, Ryan Skraba, Salva Alcántara, Samrat, Samrat Deb, Samrat002, Sebastian Mattheis, Sergey Nuyanzin, Seth Saperstein, Shengkai, Shuiqiang Chen, Smirnov Alexander, Sriram Ganesh, Steven van Rossum, Tartarus0zm, Timo Walther, Venkata krishnan Sowrirajan, Wei Zhong, Weihua Hu, Weijie Guo, Xianxun Ye, Xintong Song, Yash Mayya, YasuoStudyJava, Yu Chen, Yubin Li, Yufan Sheng, Yun Gao, Yun Tang, Yuxin Tan, Zakelly, Zhanghao Chen, Zhenqiu Huang, Zhu Zhu, ZmmBigdata, bzhaoopenstack, chengshuo.cs, chenxujun, chenyuzhi, chenyuzhi459, chenzihao, dependabot[bot], fanrui, fengli, frankeshi, fredia, godfreyhe, gongzhongqiang, harker2015, hehuiyuan, hiscat, huangxingbo, hunter-cloud09, ifndef-SleePy, jeremyber-aws, jiangjiguang, jingge, kevin.cyj, kristoffSC, kurt, laughingman7743, libowen, lincoln lee, lincoln.lil, liujiangang, liujingmao, liuyongvs, liuzhuang2017, luoyuxia, mas-chen, moqimoqidea, muggleChen, noelo, ouyangwulin, ramkrish86, saikikun, sammieliu, shihong90, shuiqiangchen, snuyanzin, sunxia, sxnan, tison, todd5167, tonyzhu918, wangfeifan, wenbingshen, xuyang, yiksanchan, yunfengzhou-hub, yunhong, yuxia Luo, yuzelin, zhangjingcun, zhangmang, zhengyunhong.zyh, zhouli, zoucao, 沈嘉琦