No Java Required: Configuring Sources and Sinks in SQL

February 20, 2020 - Seth Wiesman (@sjwiesman)

Introduction #

The recent Apache Flink 1.10 release includes many exciting features. In particular, it marks the end of the community’s year-long effort to merge in the Blink SQL contribution from Alibaba. The reason the community chose to spend so much time on the contribution is that SQL works. It allows Flink to offer a truly unified interface over batch and streaming and makes stream processing accessible to a broad audience of developers and analysts. Best of all, Flink SQL is ANSI-SQL compliant, which means if you’ve ever used a database in the past, you already know it1!

A lot of work focused on improving runtime performance and progressively extending its coverage of the SQL standard. Flink now supports the full TPC-DS query set for batch queries, reflecting the readiness of its SQL engine to address the needs of modern data warehouse-like workloads. Its streaming SQL supports an almost equal set of features - those that are well defined on a streaming runtime - including complex joins and MATCH_RECOGNIZE.

As important as this work is, the community also strives to make these features generally accessible to the broadest audience possible. That is why the Flink community is excited in 1.10 to offer production-ready DDL syntax (e.g., CREATE TABLE, DROP TABLE) and a refactored catalog interface.

Accessing Your Data Where It Lives #

Flink does not store data at rest; it is a compute engine and requires other systems to consume input from and write its output. Those that have used Flink’s DataStream API in the past will be familiar with connectors that allow for interacting with external systems. Flink has a vast connector ecosystem that includes all major message queues, filesystems, and databases.

If your favorite system does not have a connector maintained in the central Apache Flink repository, check out the flink packages website, which has a growing number of community-maintained components.

While these connectors are battle-tested and production-ready, they are written in Java and configured in code, which means they are not amenable to pure SQL or Table applications. For a holistic SQL experience, not only queries need to be written in SQL, but also table definitions.

CREATE TABLE Statements #

While Flink SQL has long provided table abstractions atop some of Flink’s most popular connectors, configurations were not always so straightforward. Beginning in 1.10, Flink supports defining tables through CREATE TABLE statements. With this feature, users can now create logical tables, backed by various external systems, in pure SQL.

By defining tables in SQL, developers can write queries against logical schemas that are abstracted away from the underlying physical data store. Coupled with Flink SQL’s unified approach to batch and stream processing, Flink provides a straight line from discovery to production.

Users can define tables over static data sets, anything from a local CSV file to a full-fledged data lake or even Hive. Leveraging Flink’s efficient batch processing capabilities, they can perform ad-hoc queries searching for exciting insights. Once something interesting is identified, businesses can gain real-time and continuous insights by merely altering the table so that it is powered by a message queue such as Kafka. Because Flink guarantees SQL queries have unified semantics over batch and streaming, users can be confident that redeploying this query as a continuous streaming application over a message queue will output identical results.

-- Define a table called orders that is backed by a Kafka topic
-- The definition includes all relevant Kafka properties,
-- the underlying format (JSON) and even defines a
-- watermarking algorithm based on one of the fields
-- so that this table can be used with event time.
CREATE TABLE orders (
	user_id    BIGINT,
	product    STRING,
	order_time TIMESTAMP(3),
	WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'orders',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = 'localhost:9092',
	'format.type' = 'json' 
);

-- Define a table called product_analysis
-- on top of ElasticSearch 7 where we 
-- can write the results of our query. 
CREATE TABLE product_analysis (
	product 	STRING,
	tracking_time 	TIMESTAMP(3),
	units_sold 	BIGINT
) WITH (
	'connector.type'    = 'elasticsearch',
	'connector.version' = '7',
	'connector.hosts'   = 'localhost:9200',
	'connector.index'   = 'ProductAnalysis',
	'connector.document.type' = 'analysis' 
);

-- A simple query that analyzes order data
-- from Kafka and writes results into 
-- ElasticSearch. 
INSERT INTO product_analysis
SELECT
	product_id,
	TUMBLE_START(order_time, INTERVAL '1' DAY) as tracking_time,
	COUNT(*) as units_sold
FROM orders
GROUP BY
	product_id,
	TUMBLE(order_time, INTERVAL '1' DAY);

Catalogs #

While being able to create tables is important, it often isn’t enough. A business analyst, for example, shouldn’t have to know what properties to set for Kafka, or even have to know what the underlying data source is, to be able to write a query.

To solve this problem, Flink 1.10 also ships with a revamped catalog system for managing metadata about tables and user definined functions. With catalogs, users can create tables once and reuse them across Jobs and Sessions. Now, the team managing a data set can create a table and immediately make it accessible to other groups within their organization.

The most notable catalog that Flink integrates with today is Hive Metastore. The Hive catalog allows Flink to fully interoperate with Hive and serve as a more efficient query engine. Flink supports reading and writing Hive tables, using Hive UDFs, and even leveraging Hive’s metastore catalog to persist Flink specific metadata.

Looking Ahead #

Flink SQL has made enormous strides to democratize stream processing, and 1.10 marks a significant milestone in that development. However, we are not ones to rest on our laurels and, the community is committed to raising the bar on standards while lowering the barriers to entry. The community is looking to add more catalogs, such as JDBC and Apache Pulsar. We encourage you to sign up for the mailing list and stay on top of the announcements and new features in upcoming releases.



  1. My colleague Timo, whose worked on Flink SQL from the beginning, has the entire SQL standard printed on his desk and references it before any changes are merged. It’s enormous. ↩︎