February 2015 in the Flink community

March 2, 2015 -

February might be the shortest month of the year, but this does not mean that the Flink community has not been busy adding features to the system and fixing bugs. Here’s a rundown of the activity in the Flink community last month.

0.8.1 release #

Flink 0.8.1 was released. This bugfixing release resolves a total of 22 issues.

New committer #

Max Michels has been voted a committer by the Flink PMC.

Apache SAMOA (incubating) is a distributed streaming machine learning (ML) framework with a programming abstraction for distributed streaming ML algorithms. SAMOA runs on a variety of backend engines, currently Apache Storm and Apache S4. A pull request is available at the SAMOA repository that adds a Flink adapter for SAMOA.

Flink is now integrated in bdutil, Google’s open source tool for creating and configuring (Hadoop) clusters in Google Compute Engine. Deployment of Flink clusters in now supported starting with bdutil 1.2.0.

A new blog post on Flink Streaming was published at the blog. Flink was mentioned in several articles on the web. Here are some examples:

The following features have been now merged in Flink’s master repository.

Gelly #

Gelly, Flink’s Graph API allows users to manipulate graph-shaped data directly. Here’s for example a calculation of shortest paths in a graph:

Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);

DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
     .run(new SingleSourceShortestPaths<Long>(srcVertexId,
           maxIterations)).getVertices();

See more Gelly examples here.

The newly merged flink-table module is the first step in Flink’s roadmap towards logical queries and SQL support. Here’s a preview on how you can read two CSV file, assign a logical schema to, and apply transformations like filters and joins using logical attributes rather than physical data types.

val customers = getCustomerDataSet(env)
 .as('id, 'mktSegment)
 .filter( 'mktSegment === "AUTOMOBILE" )

val orders = getOrdersDataSet(env)
 .filter( o => dateFormat.parse(o.orderDate).before(date) )
 .as('orderId, 'custId, 'orderDate, 'shipPrio)

val items =
 orders.join(customers)
   .where('custId === 'id)
   .select('orderId, 'orderDate, 'shipPrio)

Access to HCatalog tables #

With the flink-hcatalog module, you can now conveniently access HCatalog/Hive tables. The module supports projection (selection and order of fields) and partition filters.

Access to secured YARN clusters/HDFS. #

With this change users can access Kerberos secured YARN (and HDFS) Hadoop clusters. Also, basic support for accessing secured HDFS with a standalone Flink setup is now available.