RxJava – Creating Operators ”; Previous Next Following are the operators which are used to create an Observable. Sr.No. Operator & Description 1 Create Creates an Observable from scratch and allows observer method to call programmatically. 2 Defer Do not create an Observable until an observer subscribes. Creates a fresh observable for each observer. 3 Empty/Never/Throw Creates an Observable with limited behavior. 4 From Converts an object/data structure into an Observable. 5 Interval Creates an Observable emitting integers in sequence with a gap of specified time interval. 6 Just Converts an object/data structure into an Observable to emit the same or same type of objects. 7 Range Creates an Observable emitting integers in sequence of given range. 8 Repeat Creates an Observable emitting integers in sequence repeatedly. 9 Start Creates an Observable to emit the return value of a function. 10 Timer Creates an Observable to emit a single item after given delay. Creating Operator Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import io.reactivex.Observable; //Using fromArray operator to create an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − ABCDEFG Print Page Previous Next Advertisements ”;
Category: rxjava
RxJava – Environment Setup
RxJava – Environment Setup ”; Previous Next Local Environment Setup RxJava is a library for Java, so the very first requirement is to have JDK installed in your machine. System Requirement JDK 1.5 or above. Memory No minimum requirement. Disk Space No minimum requirement. Operating System No minimum requirement. Step 1 – Verify Java Installation in Your Machine First of all, open the console and execute a java command based on the operating system you are working on. OS Task Command Windows Open Command Console c:> java -version Linux Open Command Terminal $ java -version Mac Open Terminal machine:< joseph$ java -version Let”s verify the output for all the operating systems − OS Output Windows java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) Linux java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) Mac java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) If you do not have Java installed on your system, then download the Java Software Development Kit (SDK) from the following link https://www.oracle.com. We are assuming Java 1.8.0_101 as the installed version for this tutorial. Step 2 – Set JAVA Environment Set the JAVA_HOME environment variable to point to the base directory location where Java is installed on your machine. For example. OS Output Windows Set the environment variable JAVA_HOME to C:Program FilesJavajdk1.8.0_101 Linux export JAVA_HOME = /usr/local/java-current Mac export JAVA_HOME = /Library/Java/Home Append Java compiler location to the System Path. OS Output Windows Append the string C:Program FilesJavajdk1.8.0_101bin at the end of the system variable, Path. Linux export PATH = $PATH:$JAVA_HOME/bin/ Mac not required Verify Java installation using the command java -version as explained above. Step 3 – Download RxJava2 Archive Download the latest version of RxJava jar file from RxJava @ MVNRepository and its dependency Reactive Streams @ MVNRepository . At the time of writing this tutorial, we have downloaded rxjava-2.2.4.jar, reactive-streams-1.0.2.jar and copied it into C:>RxJava folder. OS Archive name Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Step 4 – Set RxJava Environment Set the RX_JAVA environment variable to point to the base directory location where RxJava jar is stored on your machine. Let’s assuming we”ve stored rxjava-2.2.4.jar and reactive-streams-1.0.2.jar in the RxJava folder. Sr.No OS & Description 1 Windows Set the environment variable RX_JAVA to C:RxJava 2 Linux export RX_JAVA = /usr/local/RxJava 3 Mac export RX_JAVA = /Library/RxJava Step 5 – Set CLASSPATH Variable Set the CLASSPATH environment variable to point to the RxJava jar location. Sr.No OS & Description 1 Windows Set the environment variable CLASSPATH to %CLASSPATH%;%RX_JAVA%rxjava-2.2.4.jar;%RX_JAVA%reactive-streams-1.0.2.jar;.; 2 Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. 3 Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. Step 6 – Test RxJava Setup Create a class TestRx.java as shown below − import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just(“Hello World!”).subscribe(System.out::println); } } Step 7 – Verify the Result Compile the classes using javac compiler as follows − C:RxJava>javac Tester.java Verify the output. Hello World! Print Page Previous Next Advertisements ”;
RxJava – BehaviorSubject
RxJava – BehaviorSubject ”; Previous Next BehaviorSubject emits the most recent item it has observed and then all subsequent observed items to each subscribed Observer. Class Declaration Following is the declaration for io.reactivex.subjects.BehaviorSubject<T> class − public final class BehaviorSubject<T> extends Subject<T> BehaviorSubject Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import io.reactivex.subjects.BehaviorSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); BehaviorSubject<String> subject = BehaviorSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext(“a”); subject.onNext(“b”); subject.onNext(“c”); subject.subscribe(value -> result2.append(value)); subject.onNext(“d”); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be cd being BehaviorSubject //(c is last item emitted before subscribe) System.out.println(result2); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − abcd cd Print Page Previous Next Advertisements ”;
RxJava – Computation Scheduler ”; Previous Next Schedulers.computation() method creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations. Schedulers.computation() Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just(“A”, “AB”, “ABC”) .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println(“Processing Thread ” + Thread.currentThread().getName())) .subscribeOn(Schedulers.computation())) .subscribe(length -> System.out.println(“Receiver Thread ” + Thread.currentThread().getName() + “, Item length ” + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Processing Thread RxComputationThreadPool-1 Receiver Thread RxComputationThreadPool-1, Item length 1 Processing Thread RxComputationThreadPool-2 Receiver Thread RxComputationThreadPool-2, Item length 2 Processing Thread RxComputationThreadPool-3 Receiver Thread RxComputationThreadPool-3, Item length 3 Print Page Previous Next Advertisements ”;
RxJava – Overview
RxJava – Overview ”; Previous Next RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava. Extends the observer pattern. Support sequences of data/events. Provides operators to compose sequences together declaratively. Handles threading, synchronization, thread-safety and concurrent data structures internally. What is ReactiveX? ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers. As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern. The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. Functional Programming Functional programming revolves around building the software using pure functions. A pure function do not depends upon previous state and always returns the same result for the same parameters passed. Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments. Reactive Programming Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived. Functional Reactive Programming RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly. The Reactive Manifesto Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software − Responsive − Should always respond in a timely fashion. Message Driven − Should use asynchronous message-passing between components so that they maintain loose coupling. Elastic − Should stay responsive even under high load. Resilient − Should stay responsive even if any component(s) fail. Key components of RxJava RxJava have two key components: Observables and Observer. Observable − It represents an object similar to Stream which can emit zero or more data, can send error message, whose speed can be controlled while emitting a set of data, can send finite as well as infinite data. Observer − It subscribes to Observable”s data of sequence and reacts per item of the observables. Observers are notified whenever Observable emits a data. An Observer handles data one by one. An observer is never notified if items are not present or a callback is not returned for a previous item. Print Page Previous Next Advertisements ”;
RxJava – Trampoline Scheduler ”; Previous Next Schedulers.trampoline() method creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. Schedulers.trampoline() Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just(“A”, “AB”, “ABC”) .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println(“Processing Thread ” + Thread.currentThread().getName())) .subscribeOn(Schedulers.trampoline())) .subscribe(length -> System.out.println(“Receiver Thread ” + Thread.currentThread().getName() + “, Item length ” + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Processing Thread main Receiver Thread main, Item length 1 Processing Thread main Receiver Thread main, Item length 2 Processing Thread main Receiver Thread main, Item length 3 Print Page Previous Next Advertisements ”;
RxJava – How Observable works ”; Previous Next Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items. Observable Observable provides data once subscriber starts listening. Observable can emit any number of items. Observable can emit only signal of completion as well with no item. Observable can terminate successfully. Observable may never terminate. e.g. a button can be clicked any number of times. Observable may throw error at any point of time. Subscriber Observable can have multiple subscribers. When an Observable emits an item, each subscriber onNext() method gets invoked. When an Observable finished emitting items, each subscriber onComplete() method gets invoked. If an Observable emits error, each subscriber onError() method gets invoked. Print Page Previous Next Advertisements ”;
RxJava – Schedulers
RxJava – Schedulers ”; Previous Next Schedulers are used in multi-threading environment to work with Observable operators. As per the Reactive,Scheduler are used to schedule how chain of operators will apply to different threads. By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers. There are following types of Schedulers available in RxJava − Sr.No. Scheduler & Description 1 Schedulers.computation() Creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations. 2 Schedulers.io() Creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. 3 Schedulers.newThread() Creates and returns a Scheduler that creates a new Thread for each unit of work. 4 Schedulers.trampoline() Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 4 Schedulers.from(java.util.concurrent.Executor executor) Converts an Executor into a new Scheduler instance. Print Page Previous Next Advertisements ”;
RxJava – MayBe Observable
RxJava – MayBe Observable ”; Previous Next The MayBe class represents deferred response. MayBe observable can emit either a single successful value or no value. Class Declaration Following is the declaration for io.reactivex.Single<T> class − public abstract class Maybe<T> extends Object implements MaybeSource<T> Protocol Following is the sequential protocol that MayBe Observable operates − onSubscribe (onSuccess | onError | OnComplete)? MayBe Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.concurrent.TimeUnit; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Maybe.just(“Hello World”) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println(“Done!”); } }); Thread.sleep(3000); //start observing disposable.dispose(); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Hello World Print Page Previous Next Advertisements ”;