Apache Kafka – Cluster Architecture

Apache Kafka – Cluster Architecture ”; Previous Next Take a look at the following illustration. It shows the cluster diagram of Kafka. The following table describes each of the components shown in the above diagram. S.No Components and Description 1 Broker Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact. Kafka broker leader election can be done by ZooKeeper. 2 ZooKeeper ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker. 3 Producers Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle. 4 Consumers Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper. Print Page Previous Next Advertisements ”;

Integration With Storm

Apache Kafka – Integration With Storm ”; Previous Next In this chapter, we will learn how to integrate Kafka with Apache Storm. About Storm Storm was originally created by Nathan Marz and team at BackType. In a short time, Apache Storm became a standard for distributed real-time processing system that allows you to process a huge volume of data. Storm is very fast and a benchmark clocked it at over a million tuples processed per second per node. Apache Storm runs continuously, consuming data from the configured sources (Spouts) and passes the data down the processing pipeline (Bolts). Com-bined, Spouts and Bolts make a Topology. Integration with Storm Kafka and Storm naturally complement each other, and their powerful cooperation enables real-time streaming analytics for fast-moving big data. Kafka and Storm integration is to make easier for developers to ingest and publish data streams from Storm topologies. Conceptual flow A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped. Let us go through the Kafka-Storm integration API’s in detail. There are three main classes to integrate Kafka with Storm. They are as follows − BrokerHosts – ZkHosts & StaticHosts BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker. The signature of ZkHosts is as follows − public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr) Where brokerZkStr is ZooKeeper host and brokerZkPath is the ZooKeeper path to maintain the Kafka broker details. KafkaConfig API This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows public KafkaConfig(BrokerHosts hosts, string topic) Hosts − The BrokerHosts can be ZkHosts / StaticHosts. Topic − topic name. SpoutConfig API Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information. public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id) Hosts − The BrokerHosts can be any implementation of BrokerHosts interface Topic − topic name. zkRoot − ZooKeeper root path. id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout. SchemeAsMultiScheme SchemeAsMultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It is derived from MultiScheme and accept implementation of Scheme class. There are lot of implementation of Scheme class and one such implementation is StringScheme, which parses the byte as a simple string. It also controls the naming of your output field. The signature is defined as follows. public SchemeAsMultiScheme(Scheme scheme) Scheme − byte buffer consumed from kafka. KafkaSpout API KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the mes-sages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig. Below is a sample code to create a simple Kafka spout. // ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, “/” + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); Bolt Creation Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes WordSplitter-Bolt and WordCounterBolt are used to perform the operations. IRichBolt interface has the following methods − Prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout. Execute − Process a single tuple of input. Cleanup − Called when a bolt is going to shut down. declareOutputFields − Declares the output schema of the tuple. Let us create SplitBolt.java, which implements the logic to split a sentence into words and CountBolt.java, which implements logic to separate unique words and count its occurrence. SplitBolt.java import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class SplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(” “); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“word”)); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } } CountBolt.java import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class CountBolt implements IRichBolt{ Map<String, Integer> counters; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()&plus;” : ” &plus; entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } } Submitting to Topology The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create to-pology. shuffleGrouping

Apache Kafka – Tools

Apache Kafka – Tools ”; Previous Next Kafka Tool packaged under “org.apache.kafka.tools.*. Tools are categorized into system tools and replication tools. System Tools System tools can be run from the command line using the run class script. The syntax is as follows − bin/kafka-run-class.sh package.class – – options Some of the system tools are mentioned below − Kafka Migration Tool − This tool is used to migrate a broker from one version to an-other. Mirror Maker − This tool is used to provide mirroring of one Kafka cluster to another. Consumer Offset Checker − This tool displays Consumer Group, Topic, Partitions, Off-set, logSize, Owner for the specified set of Topics and Consumer Group. Replication Tool Kafka replication is a high level design tool. The purpose of adding replication tool is for stronger durability and higher availability. Some of the replication tools are mentioned below − Create Topic Tool − This creates a topic with a default number of partitions, replication factor and uses Kafka”s default scheme to do replica assignment. List Topic Tool − This tool lists the information for a given list of topics. If no topics are provided in the command line, the tool queries Zookeeper to get all the topics and lists the information for them. The fields that the tool displays are topic name, partition, leader, replicas, isr. Add Partition Tool − Creation of a topic, the number of partitions for topic has to be specified. Later on, more partitions may be needed for the topic, when the volume of the topic will increase. This tool helps to add more partitions for a specific topic and also allows manual replica assignment of the added partitions. Print Page Previous Next Advertisements ”;

Apache Kafka – Basic Operations

Apache Kafka – Basic Operations ”; Previous Next First let us start implementing single node-single broker configuration and we will then migrate our setup to single node-multiple brokers configuration. Hopefully you would have installed Java, ZooKeeper and Kafka on your machine by now. Before moving to the Kafka Cluster Setup, first you would need to start your ZooKeeper because Kafka Cluster uses ZooKeeper. Start ZooKeeper Open a new terminal and type the following command − bin/zookeeper-server-start.sh config/zookeeper.properties To start Kafka Broker, type the following command − bin/kafka-server-start.sh config/server.properties After starting Kafka Broker, type the command jps on ZooKeeper terminal and you would see the following response − 821 QuorumPeerMain 928 Kafka 931 Jps Now you could see two daemons running on the terminal where QuorumPeerMain is ZooKeeper daemon and another one is Kafka daemon. Single Node-Single Broker Configuration In this configuration you have a single ZooKeeper and broker id instance. Following are the steps to configure it − Creating a Kafka Topic − Kafka provides a command line utility named kafka-topics.sh to create topics on the server. Open new terminal and type the below example. Syntax bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic-name Example bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic Hello-Kafka We just created a topic named Hello-Kafka with a single partition and one replica factor. The above created output will be similar to the following output − Output − Created topic Hello-Kafka Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in “/tmp/kafka-logs/“ in the config/server.properties file. List of Topics To get a list of topics in Kafka server, you can use the following command − Syntax bin/kafka-topics.sh –list –zookeeper localhost:2181 Output Hello-Kafka Since we have created a topic, it will list out Hello-Kafka only. Suppose, if you create more than one topics, you will get the topic names in the output. Start Producer to Send Messages Syntax bin/kafka-console-producer.sh –broker-list localhost:9092 –topic topic-name From the above syntax, two main parameters are required for the producer command line client − Broker-list − The list of brokers that we want to send the messages to. In this case we only have one broker. The Config/server.properties file contains broker port id, since we know our broker is listening on port 9092, so you can specify it directly. Topic name − Here is an example for the topic name. Example bin/kafka-console-producer.sh –broker-list localhost:9092 –topic Hello-Kafka The producer will wait on input from stdin and publishes to the Kafka cluster. By default, every new line is published as a new message then the default producer properties are specified in config/producer.properties file. Now you can type a few lines of messages in the terminal as shown below. Output $ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message My second message Start Consumer to Receive Messages Similar to producer, the default consumer properties are specified in config/consumer.proper-ties file. Open a new terminal and type the below syntax for consuming messages. Syntax bin/kafka-console-consumer.sh –zookeeper localhost:2181 —topic topic-name –from-beginning Example bin/kafka-console-consumer.sh –zookeeper localhost:2181 —topic Hello-Kafka –from-beginning Output Hello My first message My second message Finally, you are able to enter messages from the producer’s terminal and see them appearing in the consumer’s terminal. As of now, you have a very good understanding on the single node cluster with a single broker. Let us now move on to the multiple brokers configuration. Single Node-Multiple Brokers Configuration Before moving on to the multiple brokers cluster setup, first start your ZooKeeper server. Create Multiple Kafka Brokers − We have one Kafka broker instance already in con-fig/server.properties. Now we need multiple broker instances, so copy the existing server.prop-erties file into two new config files and rename it as server-one.properties and server-two.prop-erties. Then edit both new files and assign the following changes − config/server-one.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1 config/server-two.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2 Start Multiple Brokers− After all the changes have been made on three servers then open three new terminals to start each broker one by one. Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties Now we have three different brokers running on the machine. Try it by yourself to check all the daemons by typing jps on the ZooKeeper terminal, then you would see the response. Creating a Topic Let us assign the replication factor value as three for this topic because we have three different brokers running. If you have two brokers, then the assigned replica value will be two. Syntax bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 -partitions 1 –topic topic-name Example bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 -partitions 1 –topic Multibrokerapplication Output created topic “Multibrokerapplication” The Describe command is used to check which broker is listening on the current created topic as shown below − bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic Multibrokerappli-cation Output bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1 From the above output, we can conclude that first line gives a summary of all the partitions, showing topic name, partition count and the replication factor that we have chosen already. In the second line, each node will be the leader for a randomly selected portion of the partitions. In our case, we see that our first broker (with broker.id 0) is the leader. Then Replicas:0,2,1 means that all the brokers replicate the topic finally Isr is the set of in-sync replicas. Well, this is the subset of replicas that are currently alive

Real Time Application(Twitter)

Real Time Application(Twitter) ”; Previous Next Let us analyze a real time application to get the latest twitter feeds and its hashtags. Earlier, we have seen integration of Storm and Spark with Kafka. In both the scenarios, we created a Kafka Producer (using cli) to send message to the Kafka ecosystem. Then, the storm and spark inte-gration reads the messages by using the Kafka consumer and injects it into storm and spark ecosystem respectively. So, practically we need to create a Kafka Producer, which should − Read the twitter feeds using “Twitter Streaming API”, Process the feeds, Extract the HashTags and Send it to Kafka. Once the HashTags are received by Kafka, the Storm / Spark integration receive the infor-mation and send it to Storm / Spark ecosystem. Twitter Streaming API The “Twitter Streaming API” can be accessed in any programming language. The “twitter4j” is an open source, unofficial Java library, which provides a Java based module to easily access the “Twitter Streaming API”. The “twitter4j” provides a listener based framework to access the tweets. To access the “Twitter Streaming API”, we need to sign in for Twitter developer account and should get the following OAuth authentication details. Customerkey CustomerSecret AccessToken AccessTookenSecret Once the developer account is created, download the “twitter4j” jar files and place it in the java class path. The Complete Twitter Kafka producer coding (KafkaTwitterProducer.java) is listed below − import java.util.Arrays; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.*; import twitter4j.conf.*; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaTwitterProducer { public static void main(String[] args) throws Exception { LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000); if(args.length < 5){ System.out.println( “Usage: KafkaTwitterProducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-access-token-secret> <topic-name> <twitter-search-keywords>”); return; } String consumerKey = args[0].toString(); String consumerSecret = args[1].toString(); String accessToken = args[2].toString(); String accessTokenSecret = args[3].toString(); String topicName = args[4].toString(); String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length); ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance(); StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); // System.out.println(“@” &plus; status.getUser().getScreenName() &plus; ” – ” &plus; status.getText()); // System.out.println(“@” &plus; status.getUser().getScreen-Name()); /*for(URLEntity urle : status.getURLEntities()) { System.out.println(urle.getDisplayURL()); }*/ /*for(HashtagEntity hashtage : status.getHashtagEntities()) { System.out.println(hashtage.getText()); }*/ } @Override public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) { // System.out.println(“Got a status deletion notice id:” &plus; statusDeletionNotice.getStatusId()); } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { // System.out.println(“Got track limitation notice:” &plus; num-berOfLimitedStatuses); } @Override public void onScrubGeo(long userId, long upToStatusId) { // System.out.println(“Got scrub_geo event userId:” &plus; userId &plus; “upToStatusId:” &plus; upToStatusId); } @Override public void onStallWarning(StallWarning warning) { // System.out.println(“Got stall warning:” &plus; warning); } @Override public void onException(Exception ex) { ex.printStackTrace(); } }; twitterStream.addListener(listener); FilterQuery query = new FilterQuery().track(keyWords); twitterStream.filter(query); Thread.sleep(5000); //Add Kafka producer config settings Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“acks”, “all”); props.put(“retries”, 0); props.put(“batch.size”, 16384); props.put(“linger.ms”, 1); props.put(“buffer.memory”, 33554432); props.put(“key.serializer”, “org.apache.kafka.common.serializa-tion.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serializa-tion.StringSerializer”); Producer<String, String> producer = new KafkaProducer<String, String>(props); int i = 0; int j = 0; while(i < 10) { Status ret = queue.poll(); if (ret == null) { Thread.sleep(100); i++; }else { for(HashtagEntity hashtage : ret.getHashtagEntities()) { System.out.println(“Hashtag: ” &plus; hashtage.getText()); producer.send(new ProducerRecord<String, String>( top-icName, Integer.toString(j++), hashtage.getText())); } } } producer.close(); Thread.sleep(5000); twitterStream.shutdown(); } } Compilation Compile the application using the following command − javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java Execution Open two consoles. Run the above compiled application as shown below in one console. java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”: . KafkaTwitterProducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-ac-cess-token-secret> my-first-topic food Run any one of the Spark / Storm application explained in the previous chapter in another win-dow. The main point to note is that the topic used should be same in both cases. Here, we have used “my-first-topic” as the topic name. Output The output of this application will depend on the keywords and the current feed of the twitter. A sample output is specified below (storm integration). . . . food : 1 foodie : 2 burger : 1 . . . Print Page Previous Next Advertisements ”;

Apache Kafka – Work Flow

Apache Kafka – WorkFlow ”; Previous Next As of now, we discussed the core concepts of Kafka. Let us now throw some light on the workflow of Kafka. Kafka is simply a collection of topics split into one or more partitions. A Kafka partition is a linearly ordered sequence of messages, where each message is identified by their index (called as offset). All the data in a Kafka cluster is the disjointed union of partitions. Incoming messages are written at the end of a partition and messages are sequentially read by consumers. Durability is provided by replicating messages to different brokers. Kafka provides both pub-sub and queue based messaging system in a fast, reliable, persisted, fault-tolerance and zero downtime manner. In both cases, producers simply send the message to a topic and consumer can choose any one type of messaging system depending on their need. Let us follow the steps in the next section to understand how the consumer can choose the messaging system of their choice. Workflow of Pub-Sub Messaging Following is the step wise workflow of the Pub-Sub Messaging − Producers send message to a topic at regular intervals. Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition. Consumer subscribes to a specific topic. Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble. Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages. Once Kafka receives the messages from producers, it forwards these messages to the consumers. Consumer will receive the message and process it. Once the messages are processed, consumer will send an acknowledgement to the Kafka broker. Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages. This above flow will repeat until the consumer stops the request. Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages. Workflow of Queue Messaging / Consumer Group In a queue messaging system instead of a single consumer, a group of consumers having the same Group ID will subscribe to a topic. In simple terms, consumers subscribing to a topic with same Group ID are considered as a single group and the messages are shared among them. Let us check the actual workflow of this system. Producers send message to a topic in a regular interval. Kafka stores all messages in the partitions configured for that particular topic similar to the earlier scenario. A single consumer subscribes to a specific topic, assume Topic-01 with Group ID as Group-1. Kafka interacts with the consumer in the same way as Pub-Sub Messaging until new consumer subscribes the same topic, Topic-01 with the same Group ID as Group-1. Once the new consumer arrives, Kafka switches its operation to share mode and shares the data between the two consumers. This sharing will go on until the number of con-sumers reach the number of partition configured for that particular topic. Once the number of consumer exceeds the number of partitions, the new consumer will not receive any further message until any one of the existing consumer unsubscribes. This scenario arises because each consumer in Kafka will be assigned a minimum of one partition and once all the partitions are assigned to the existing consumers, the new consumers will have to wait. This feature is also called as Consumer Group. In the same way, Kafka will provide the best of both the systems in a very simple and efficient manner. Role of ZooKeeper A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on. Since all the critical information is stored in the Zookeeper and it normally replicates this data across its ensemble, failure of Kafka broker / Zookeeper does not affect the state of the Kafka cluster. Kafka will restore the state, once the Zookeeper restarts. This gives zero downtime for Kafka. The leader election between the Kafka broker is also done by using Zookeeper in the event of leader failure. To learn more on Zookeeper, please refer zookeeper Let us continue further on how to install Java, ZooKeeper, and Kafka on your machine in the next chapter. Print Page Previous Next Advertisements ”;

Integration With Spark

Apache Kafka – Integration With Spark ”; Previous Next In this chapter, we will be discussing about how to integrate Apache Kafka with Spark Streaming API. About Spark Spark Streaming API enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, etc., and can be processed using complex algorithms such as high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dash-boards. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. Integration with Spark Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be publishing results into yet another Kafka topic or store in HDFS, databases or dashboards. The following diagram depicts the conceptual flow. Now, let us go through Kafka-Spark API’s in detail. SparkConf API It represents configuration for a Spark application. Used to set various Spark parameters as key-value pairs. SparkConf class has the following methods − set(string key, string value) − set configuration variable. remove(string key) − remove key from the configuration. setAppName(string name) − set application name for your application. get(string key) − get key StreamingContext API This is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on the cluster. The signature is defined as shown below. public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment) master − cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). appName − a name for your job, to display on the cluster web UI batchDuration − the time interval at which streaming data will be divided into batches public StreamingContext(SparkConf conf, Duration batchDuration) Create a StreamingContext by providing the configuration necessary for a new SparkContext. conf − Spark parameters batchDuration − the time interval at which streaming data will be divided into batches KafkaUtils API KafkaUtils API is used to connect the Kafka cluster to Spark streaming. This API has the signifi-cant method createStream signature defined as below. public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel) The above shown method is used to Create an input stream that pulls messages from Kafka Brokers. ssc − StreamingContext object. zkQuorum − Zookeeper quorum. groupId − The group id for this consumer. topics − return a map of topics to consume. storageLevel − Storage level to use for storing the received objects. KafkaUtils API has another method createDirectStream, which is used to create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once. The sample application is done in Scala. To compile the application, please download and install sbt, scala build tool (similar to maven). The main application code is presented below. import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println(“Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>”) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(“KafkaWordCount”) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(“checkpoint”) val topicMap = topics.split(“,”).map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(” “)) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ &plus; _, _ – _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } } Build Script The spark-kafka integration depends on the spark, spark streaming and spark Kafka integration jar. Create a new file build.sbt and specify the application details and its dependency. The sbt will download the necessary jar while compiling and packing the application. name := “Spark Kafka Project” version := “1.0” scalaVersion := “2.10.5” libraryDependencies += “org.apache.spark” %% “spark-core” % “1.6.0” libraryDependencies += “org.apache.spark” %% “spark-streaming” % “1.6.0” libraryDependencies += “org.apache.spark” %% “spark-streaming-kafka” % “1.6.0” Compilation / Packaging Run the following command to compile and package the jar file of the application. We need to submit the jar file into the spark console to run the application. sbt package Submiting to Spark Start Kafka Producer CLI (explained in the previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below. Another spark test message Run the following command to submit the application to spark console. /usr/local/spark/bin/spark-submit –packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 –class “KafkaWordCount” –master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads> The sample output of this application is shown below. spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message .. Print Page Previous Next Advertisements ”;

Math & Trignometric Functions

Math and Trigonometric Functions ”; Previous Next The Excel Math & Trig functions perform many of the common mathematical calculations, including basic arithmetic, conditional sums & products, exponents & logarithms, and the trigonometric ratios. Some more math-related functions are also discussed in the Statistical functions and Engineering functions categories. Math and Trigonometric Functions The following table lists all the Math & Trigonometric functions − S.No. Function and Description 1 ABS Returns the absolute value of a number 2 ACOS Returns the arccosine of a number 3 ACOSH Returns the inverse hyperbolic cosine of a number 4 ACOT Returns the arccotangent of a number 5 ACOTH Returns the hyperbolic arccotangent of a number 6 AGGREGATE Returns an aggregate in a list or database 7 ARABIC Converts a Roman number to Arabic, as a number 8 ASIN Returns the arcsine of a number 9 ASINH Returns the inverse hyperbolic sine of a number 10 ATAN Returns the arctangent of a number 11 ATAN2 Returns the arctangent from x and y coordinates 12 ATANH Returns the inverse hyperbolic tangent of a number 13 BASE Converts a number into a text representation with the given radix (base) 14 CEILING.MATH Rounds a number up, to the nearest integer or to the nearest multiple of significance 15 COMBIN Returns the number of combinations for a given number of objects 16 COMBINA Returns the number of combinations with repetitions for a given number of items 17 COS Returns the cosine of a number 18 COSH Returns the hyperbolic cosine of a number 19 COT Returns the cotangent of an angle 20 COTH Returns the hyperbolic cotangent of a number 21 CSC Returns the cosecant of an angle 22 CSCH Returns the hyperbolic cosecant of an angle 23 DECIMAL Converts a text representation of a number in a given base into a decimal number 24 DEGREES Converts radians to degrees 25 EVEN Rounds a number up to the nearest even integer 26 EXP Returns e raised to the power of a given number 27 FACT Returns the factorial of a number 28 FACTDOUBLE Returns the double factorial of a number 29 FLOOR.MATH Rounds a number down, to the nearest integer or to the nearest multiple of significance 30 GCD Returns the greatest common divisor 31 INT Rounds a number down to the nearest integer 32 LCM Returns the least common multiple 33 LN Returns the natural logarithm of a number 34 LOG Returns the logarithm of a number to a specified base 35 LOG10 Returns the base-10 logarithm of a number 36 MDETERM Returns the matrix determinant of an array 37 MINVERSE Returns the matrix inverse of an array 38 MMULT Returns the matrix product of two arrays 39 MOD Returns the remainder from division 40 MROUND Returns a number rounded to the desired multiple 41 MULTINOMIAL Returns the multinomial of a set of numbers 42 MUNIT Returns the unit matrix or the specified dimension 43 ODD Rounds a number up to the nearest odd integer 44 PI Returns the value of pi 45 POWER Returns the result of a number raised to a power 46 PRODUCT Multiplies its arguments 47 QUOTIENT Returns the integer portion of a division 48 RADIANS Converts degrees to radians 49 RAND Returns a random number between 0 and 1 50 RANDBETWEEN Returns a random number between the numbers that you specify 51 ROMAN Converts an Arabic numeral to Roman, as text 52 ROUND Rounds a number to a specified number of digits 53 ROUNDDOWN Rounds a number down, toward 0 54 ROUNDUP Rounds a number up, away from 0 55 SEC Returns the secant of an angle 56 SECH Returns the hyperbolic secant of an angle 57 SERIESSUM Returns the sum of a power series based on the formula 58 SIGN Returns the sign of a number 59 SIN Returns the sine of the given angle 60 SINH Returns the hyperbolic sine of a number 61 SQRT Returns a positive square root 62 SQRTPI Returns the square root of pi 63 SUBTOTAL Returns a subtotal in a list or database 64 SUM Adds its arguments 65 SUMIF Adds the cells specified by a given criteria 66 SUMIFS Adds the cells specified by a multiple criteria 67 SUMPRODUCT Returns the sum of the products of corresponding array components 68 SUMSQ Returns the sum of the squares of the arguments 69 SUMX2MY2 Returns the sum of the difference of squares of corresponding values in two arrays 70 SUMX2PY2 Returns the sum of the sum of squares of corresponding values in two arrays 71 SUMXMY2 Returns the sum of squares of differences of corresponding values in two arrays 72 TAN Returns the tangent of a number 73 TANH Returns the hyperbolic tangent of a number 74 TRUNC Truncates a number (you specify the precision of the truncation) Print Page Previous Next Advertisements ”;

Advanced Excel Functions – Logical

Advanced Excel – Logical Functions ”; Previous Next Logical functions include the boolean operators and conditional tests, which will be an essential part of many working spreadsheets. Logical Functions The following table lists all the Logical functions − S.No. Function and Description 1 AND Returns TRUE if all its arguments are TRUE. 2 FALSE Returns the logical value FALSE. 3 IF Specifies a logical test to perform. 4 IFERROR Returns a different result if the first argument evaluates to an error. 5 IFNA Returns the value you specify if the expression resolves to #N/A, otherwise returns the result of the expression. 6 IFS Checks whether one or more conditions are met and returns a value that corresponds to the first TRUE condition. 7 NOT Reverses the logic of its argument. 8 OR Returns TRUE if any argument is TRUE. 9 SWITCH Evaluates an expression against a list of values and returns the result corresponding to the first matching value. If there is no match, an optional default value may be returned. 10 TRUE Returns the logical value TRUE. 11 XOR Returns a logical exclusive OR of all arguments. Print Page Previous Next Advertisements ”;

Statistical Functions

Advanced Excel – Statistical Functions ”; Previous Next Statistical functions perform calculations ranging from basic mean, median & mode to the more complex statistical distribution and probability tests. Statistical Functions The following table lists all the Statistical functions − S.No. Function and Description 1 AVEDEV Returns the average of the absolute deviations of data points from their mean 2 AVERAGE Returns the average of its arguments 3 AVERAGEA Returns the average of its arguments and includes evaluation of text and logical values 4 AVERAGEIF Returns the average for the cells specified by a given criterion 5 AVERAGEIFS Returns the average for the cells specified by multiple criteria 6 BETA.DIST Returns the beta cumulative distribution function 7 BETA.INV Returns the inverse of the cumulative distribution function for a specified beta distribution 8 BINOM.DIST Returns the individual term binomial distribution probability 9 BINOM.DIST.RANGE Returns the probability of a trial result using a binomial distribution 10 BINOM.INV Returns the smallest value for which the cumulative binomial distribution is less than or equal to a criterion value 11 CHISQ.DIST Returns the cumulative beta probability density function 12 CHISQ.DIST.RT Returns the one-tailed probability of the chi-squared distribution 13 CHISQ.INV Returns the cumulative beta probability density function 14 CHISQ.INV.RT Returns the inverse of the one-tailed probability of the chi-squared distribution 15 CHISQ.TEST Returns the test for independence 16 CONFIDENCE.NORM Returns the confidence interval for a population mean 17 CONFIDENCE.T Returns the confidence interval for a population mean, using a Student”s t distribution 18 CORREL Returns the correlation coefficient between two data sets 19 COUNT Counts how many numbers are in the list of arguments 20 COUNTA Counts how many values are in the list of arguments 21 COUNTBLANK Counts the number of blank cells in the argument range 22 COUNTIF Counts the number of cells that meet the criteria you specify in the argument 23 COUNTIFS Counts the number of cells that meet multiple criteria 24 COVARIANCE.P Returns covariance, the average of the products of paired deviations 25 COVARIANCE.S Returns the sample covariance, the average of the products deviations for each data point pair in two data sets 26 DEVSQ Returns the sum of squares of deviations 27 EXPON.DIST Returns the exponential distribution 28 F.DIST Returns the F probability distribution 29 F.DIST.RT Returns the F probability distribution 30 F.INV Returns the inverse of the F probability distribution 31 F.INV.RT Returns the inverse of the F probability distribution 32 F.TEST Returns the result of an F-test 33 FISHER Returns the Fisher transformation 34 FISHERINV Returns the inverse of the Fisher transformation 35 FORECAST Returns a value along a linear trend 36 FORECAST.ETS Calculates a future value based on existing values using the Exponential Triple Smoothing (ETS) algorithm 37 FORECAST.ETS.CONFINT Returns a confidence interval for the forecast value at the specified target date 38 FORECAST.ETS.SEASONALITY Returns the length of the repetitive pattern detected for the specified time series 39 FORECAST.ETS.STAT Returns a statistical value as a result of time series forecasting 40 FORECAST.LINEAR Calculates a future value by using existing values, using linear regression. 41 FREQUENCY Returns a frequency distribution as a vertical array 42 GAMMA Returns the Gamma function value 43 GAMMA.DIST Returns the gamma distribution 44 GAMMA.INV Returns the inverse of the gamma cumulative distribution 45 GAMMALN Returns the natural logarithm of the gamma function, G(x) 46 GAMMALN.PRECISE Returns the natural logarithm of the gamma function, G(x) 47 GAUSS Returns 0.5 less than the standard normal cumulative distribution 48 GEOMEAN Returns the geometric mean 49 GROWTH Returns values along an exponential trend 50 HARMEAN Returns the harmonic mean 51 HYPGEOM.DIST Returns the hypergeometric distribution 52 INTERCEPT Returns the intercept of the linear regression line 53 KURT Returns the kurtosis of a data set 54 LARGE Returns the kth largest value in a data set 55 LINEST Returns the parameters of a linear trend 56 LOGEST Returns the parameters of an exponential trend 57 LOGNORM.DIST Returns the cumulative lognormal distribution 58 LOGNORM.INV Returns the inverse of the lognormal cumulative distribution 59 MAX Returns the maximum value in a list of arguments, ignoring logical values and text 60 MAXA Returns the maximum value in a list of arguments, including logical values and text 61 MAXIFS Returns the maximum value among cells specified by a given set of conditions or criteria. 62 MEDIAN Returns the median of the given numbers 63 MIN Returns the minimum value in a list of arguments, ignoring logical values and text 64 MINA Returns the minimum value in a list of arguments, including logical values and text 65 MINIFS Returns the minimum value among cells specified by a given set of conditions or criteria. 66 MODE.MULT Returns a vertical array of the most frequently occurring, or repetitive values in an array or range of data 67 MODE.SNGL Returns the most common value in a data set 68 NEGBINOM.DIST Returns the negative binomial distribution 69 NORM.DIST Returns the normal cumulative distribution 70 NORM.INV Returns the inverse of the normal cumulative distribution 71 NORM.S.DIST Returns the standard normal cumulative distribution 72 NORM.S.INV Returns the inverse of the standard normal cumulative distribution 73 PEARSON Returns the Pearson product moment correlation coefficient 74 PERCENTILE.EXC Returns the k-th percentile of values in a range, where k is in the range 0..1, exclusive 75 PERCENTILE.INC Returns the k-th percentile of values in a range 76 PERCENTRANK.EXC Returns the rank of a value in a data set as a percentage (0..1, exclusive) of the data set 77 PERCENTRANK.INC Returns the percentage rank of a value in a data set 78 PERMUT Returns the number of permutations for a given number of objects 79 PERMUTATIONA Returns the number of permutations for a given number of objects (with repetitions) that can be selected from the total objects 80 PHI Returns the value of the density function for a standard normal distribution 81 POISSON.DIST Returns the Poisson distribution 82 PROB Returns the probability that values in a range are between two limits 83 QUARTILE.EXC Returns the quartile of the data