Apache Flink – Table API and SQL Table API is a relational API with SQL like expression language. This API can do both batch and stream processing. It can be embedded with Java and Scala Dataset and Datastream APIs. You can create tables from existing Datasets and Datastreams or from external data sources. Through this relational API, you can perform operations like join, aggregate, select and filter. Whether the input is batch or stream, the semantics of the query remains the same. Here is a sample Table API program − // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // register a Table tableEnv.registerTable(“table1”, …) // or tableEnv.registerTableSource(“table2”, …) // or tableEnv.registerExternalCatalog(“extCat”, …) // register an output Table tableEnv.registerTableSink(“outputTable”, …); // create a Table from a Table API query val tapiResult = tableEnv.scan(“table1”).select(…) // Create a Table from a SQL query val sqlResult = tableEnv.sqlQuery(“SELECT … FROM table2 …”) // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto(“outputTable”) // execute env.execute()
Category: apache Flink
Apache Flink – Creating a Flink Application In this chapter, we will learn how to create a Flink application. Open Eclipse IDE, click on New Project and Select Java Project. Give Project Name and click on Finish. Now, click on Finish as shown in the following screenshot. Now, right-click on src and go to New >> Class. Give a class name and click on Finish. Copy and paste the below code in the Editor. import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.util.Collector; public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(params.get(“input”)); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field “0” and sum up tuple field “1” .groupBy(0) .sum(1); // emit result if (params.has(“output”)) { counts.writeAsCsv(params.get(“output”), “n”, ” “); // execute program env.execute(“WordCount Example”); } else { System.out.println(“Printing result to stdout. Use –output to specify output path.”); counts.print(); } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split(“\W+”); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } You will get many errors in the editor, because Flink libraries need to be added to this project. Right-click on the project >> Build Path >> Configure Build Path. Select the Libraries tab and click on Add External JARs. Go to Flink”s lib directory, select all the 4 libraries and click on OK. Go to the Order and Export tab, select all the libraries and click on OK. You will see that the errors are no more there. Now, let us export this application. Right-click on the project and click on Export. Select JAR file and click Next Give a destination path and click on Next Click on Next> Click on Browse, select the main class (WordCount) and click Finish. Note − Click OK, in case you get any warning. Run the below command. It will further run the Flink application you just created. ./bin/flink run /home/ubuntu/wordcount.jar –input README.txt –output /home/ubuntu/output
Apache Flink – Introduction Apache Flink is a real-time processing framework which can process streaming data. It is an open source stream processing framework for high-performance, scalable, and accurate real-time applications. It has true streaming model and does not take input data as batch or micro-batches. Apache Flink was founded by Data Artisans company and is now developed under Apache License by Apache Flink Community. This community has over 479 contributors and 15500 + commits so far. Ecosystem on Apache Flink The diagram given below shows the different layers of Apache Flink Ecosystem − Storage Apache Flink has multiple options from where it can Read/Write data. Below is a basic storage list − HDFS (Hadoop Distributed File System) Local File System S3 RDBMS (MySQL, Oracle, MS SQL etc.) MongoDB HBase Apache Kafka Apache Flume Deploy You can deploy Apache Fink in local mode, cluster mode or on cloud. Cluster mode can be standalone, YARN, MESOS. On cloud, Flink can be deployed on AWS or GCP. Kernel This is the runtime layer, which provides distributed processing, fault tolerance, reliability, native iterative processing capability and more. APIs & Libraries This is the top layer and most important layer of Apache Flink. It has Dataset API, which takes care of batch processing, and Datastream API, which takes care of stream processing. There are other libraries like Flink ML (for machine learning), Gelly (for graph processing ), Tables for SQL. This layer provides diverse capabilities to Apache Flink.
Apache Flink – API Concepts Flink has a rich set of APIs using which developers can perform transformations on both batch and real-time data. A variety of transformations includes mapping, filtering, sorting, joining, grouping and aggregating. These transformations by Apache Flink are performed on distributed data. Let us discuss the different APIs Apache Flink offers. Dataset API Dataset API in Apache Flink is used to perform batch operations on the data over a period. This API can be used in Java, Scala and Python. It can apply different kinds of transformations on the datasets like filtering, mapping, aggregating, joining and grouping. Datasets are created from sources like local files or by reading a file from a particular sourse and the result data can be written on different sinks like distributed files or command line terminal. This API is supported by both Java and Scala programming languages. Here is a Wordcount program of Dataset API − public class WordCountProg { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements( “Hello”, “My Dataset API Flink Program”); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(” “)) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } } DataStream API This API is used for handling data in continuous stream. You can perform various operations like filtering, mapping, windowing, aggregating on the stream data. There are various sources on this data stream like message queues, files, socket streams and the result data can be written on different sinks like command line terminal. Both Java and Scala programming languages support this API. Here is a streaming Wordcount program of DataStream API, where you have continuous stream of word counts and the data is grouped in the second window. import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCountProg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream(“localhost”, 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute(“Streaming WordCount Example”); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(” “)) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
Apache Flink – Use Cases In this chapter, we will understand a few test cases in Apache Flink. Apache Flink − Bouygues Telecom Bouygues Telecom is one of the largest telecom organization in France. It has 11+ million mobile subscribers and 2.5+ million fixed customers. Bouygues heard about Apache Flink for the first time in a Hadoop Group Meeting held at Paris. Since then they have been using Flink for multiple use-cases. They have been processing billions of messages in a day in real-time through Apache Flink. This is what Bouygues has to say about Apache Flink: “We ended up with Flink because the system supports true streaming – both at the API and at the runtime level, giving us the programmability and low latency that we were looking for. In addition, we were able to get our system up and running with Flink in a fraction of the time compared to other solutions, which resulted in more available developer resources for expanding the business logic in the system.” At Bouygues, customer experience is the highest priority. They analyse data in real-time so that they can give below insights to their engineers − Real-Time Customer Experience over their network What is happening globally on the network Network evaluations and operations They created a system called LUX (Logged User Experience) which processed massive log data from network equipment with internal data reference to give quality of experience indicators which will log their customer experience and build an alarming functionality to detect any failure in consumption of data within 60 seconds. To achieve this, they needed a framework which can take massive data in real-time, is easy to set up and provides rich set of APIs for processing the streamed data. Apache Flink was a perfect fit for Bouygues Telecom. Apache Flink − Alibaba Alibaba is the largest ecommerce retail company in the world with 394 billion $ revenue in 2015. Alibaba search is the entry point to all the customers, which shows all the search and recommends accordingly. Alibaba uses Apache Flink in its search engine to show results in real-time with highest accuracy and relevancy for each user. Alibaba was looking for a framework, which was − Very Agile in maintaining one codebase for their entire search infrastructure process. Provides low latency for the availability changes in the products on the website. Consistent and cost effective. Apache Flink qualified for all the above requirements. They need a framework, which has a single processing engine and can process both batch and stream data with same engine and that is what Apache Flink does. They also use Blink, a forked version for Flink to meet some unique requirements for their search. They are also using Apache Flink”s Table API with few improvements for their search. This is what Alibaba had to say about apache Flink: “Looking back, it was no doubt a huge year for Blink and Flink at Alibaba. No one thought that we would make this much progress in a year, and we are very grateful to all the people who helped us in the community. Flink is proven to work at the very large scale. We are more committed than ever to continue our work with the community to move Flink forward!“
Apache Flink – Big Data Platform The advancement of data in the last 10 years has been enormous; this gave rise to a term ”Big Data”. There is no fixed size of data, which you can call as big data; any data that your traditional system (RDBMS) is not able to handle is Big Data. This Big Data can be in structured, semi-structured or un-structured format. Initially, there were three dimensions to data − Volume, Velocity, Variety. The dimensions have now gone beyond just the three Vs. We have now added other Vs − Veracity, Validity, Vulnerability, Value, Variability, etc. Big Data led to the emergence of multiple tools and frameworks that help in the storage and processing of data. There are a few popular big data frameworks such as Hadoop, Spark, Hive, Pig, Storm and Zookeeper. It also gave opportunity to create Next Gen products in multiple domains like Healthcare, Finance, Retail, E-Commerce and more. Whether it is an MNC or a start-up, everyone is leveraging Big Data to store and process it and take smarter decisions.
Apache Flink – Batch vs Real-time Processing In terms of Big Data, there are two types of processing − Batch Processing Real-time Processing Processing based on the data collected over time is called Batch Processing. For example, a bank manager wants to process past one-month data (collected over time) to know the number of cheques that got cancelled in the past 1 month. Processing based on immediate data for instant result is called Real-time Processing. For example, a bank manager getting a fraud alert immediately after a fraud transaction (instant result) has occurred. The table given below lists down the differences between Batch and Real-Time Processing − Batch Processing Real-Time Processing Static Files Event Streams Processed Periodically in minute, hour, day etc. Processed immediately nanoseconds Past data on disk storage In Memory Storage Example − Bill Generation Example − ATM Transaction Alert These days, real-time processing is being used a lot in every organization. Use cases like fraud detection, real-time alerts in healthcare and network attack alert require real-time processing of instant data; a delay of even few milliseconds can have a huge impact. An ideal tool for such real time use cases would be the one, which can input data as stream and not batch. Apache Flink is that real-time processing tool.
Apache Flink Tutorial Job Search Apache Flink is the open source, native analytic database for Apache Hadoop. It is shipped by vendors such as Cloudera, MapR, Oracle, and Amazon. The examples provided in this tutorial have been developing using Cloudera Apache Flink. Audience This tutorial is intended for those who want to learn Apache Flink. Apache Flink is used to process huge volumes of data at lightning-fast speed using traditional SQL knowledge. Prerequisites To make the most of this tutorial, you should have a good understanding of the basics of Hadoop and HDFS commands. It is also recommended to have a basic knowledge of SQL before going through this tutorial.
Apache Flink – Running a Flink Program In this chapter, we will learn how to run a Flink program. Let us run the Flink wordcount example on a Flink cluster. Go to Flink”s home directory and run the below command in the terminal. bin/flink run examples/batch/WordCount.jar -input README.txt -output /home/ubuntu/flink-1.7.1/output.txt Go to Flink dashboard, you will be able to see a completed job with its details. If you click on Completed Jobs, you will get detailed overview of the jobs. To check the output of wordcount program, run the below command in the terminal. cat output.txt