Discuss RxJava ”; Previous Next RxJava is a Java based extension of ReactiveX. ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers. As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern. The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. Print Page Previous Next Advertisements ”;
Category: rxjava
RxJava – Filtering Operators
RxJava – Filtering Operators ”; Previous Next Following are the operators which are used to selectively emit item(s) from an Observable. Sr.No. Operator & Description 1 Debounce Emits items only when timeout occurs without emiting another item. 2 Distinct Emits only unique items. 3 ElementAt emit only item at n index emitted by an Observable. 4 Filter Emits only those items which pass the given predicate function. 5 First Emits the first item or first item which passed the given criteria. 6 IgnoreElements Do not emits any items from Observable but marks completion. 7 Last Emits the last element from Observable. 8 Sample Emits the most recent item with given time interval. 9 Skip Skips the first n items from an Observable. 10 SkipLast Skips the last n items from an Observable. 11 Take takes the first n items from an Observable. 12 TakeLast takes the last n items from an Observable. Filtering Operator Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import io.reactivex.Observable; //Using take operator to filter an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .take(2) .subscribe( letter -> result.append(letter)); System.out.println(result); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − ab Print Page Previous Next Advertisements ”;
RxJava – Using CompositeDisposable ”; Previous Next The CompositeDisposable class represents a container which can hold multiple disposable and offers O(1) complexity of adding and removing disposables. Class Declaration Following is the declaration for io.reactivex.disposables.CompositeDisposable class − public final class CompositeDisposable extends Object implements Disposable, io.reactivex.internal.disposables.DisposableContainer CompositeDisposable Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { CompositeDisposable compositeDisposable = new CompositeDisposable(); //Create an Single observer Disposable disposableSingle = Single.just(“Hello World”) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); //Create an observer Disposable disposableMayBe = Maybe.just(“Hi”) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println(“Done!”); } }); Thread.sleep(3000); compositeDisposable.add(disposableSingle); compositeDisposable.add(disposableMayBe); //start observing compositeDisposable.dispose(); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Hello World Hi Print Page Previous Next Advertisements ”;
RxJava – Completable Observable ”; Previous Next The Completable class represents deferred response. Completable observable can either indicate a successful completion or error. Class Declaration Following is the declaration for io.reactivex.Completable class − public abstract class Completable extends Object implements CompletableSource Protocol Following is the sequential protocol that Completable Observable operates − onSubscribe (onError | onComplete)? Completable Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.concurrent.TimeUnit; import io.reactivex.Completable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Completable.complete() .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onStart() { System.out.println(“Started!”); } @Override public void onComplete() { System.out.println(“Done!”); } }); Thread.sleep(3000); //start observing disposable.dispose(); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Started! Done! Print Page Previous Next Advertisements ”;
RxJava – Useful Resources
RxJava – Useful Resources ”; Previous Next The following resources contain additional information on RxJava. Please use them to get more in-depth knowledge on this topic. Useful Links on RxJava RxJava Official Website − A complete resource for RxJava stuff. ReactiveX − Reactive Official Website Useful Books on RxJava To enlist your site on this page, please drop an email to [email protected] Print Page Previous Next Advertisements ”;
RxJava – Windowing
RxJava – Windowing ”; Previous Next Windowing operator works similar to buffer operator but it allows to gather items emitted by an Observable into another observable instead of collection and emit those Observables instead of collections. In the example below, we”ve created an Observable to emit 9 items and using window operator, 3 Observable will be emitted together. Windowing Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .window(3) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println(“Subscribed”); } @Override public void onNext(Observable<Integer> integers) { System.out.println(“onNext: “); integers.subscribe(value -> System.out.println(value)); } @Override public void onError(Throwable e) { System.out.println(“Error”); } @Override public void onComplete() { System.out.println(“Done! “); } }); Thread.sleep(3000); } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done! Print Page Previous Next Advertisements ”;
RxJava – NewThread Scheduler
RxJava – NewThread Scheduler ”; Previous Next Schedulers.newThread() method creates and returns a Scheduler that creates a new Thread for each unit of work. Schedulers.newThread() Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just(“A”, “AB”, “ABC”) .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println(“Processing Thread ” + Thread.currentThread().getName())) .subscribeOn(Schedulers.newThread())) .subscribe(length -> System.out.println(“Receiver Thread ” + Thread.currentThread().getName() + “, Item length ” + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Processing Thread RxNewThreadScheduler-1 Receiver Thread RxNewThreadScheduler-1, Item length 1 Processing Thread RxNewThreadScheduler-2 Receiver Thread RxNewThreadScheduler-2, Item length 2 Processing Thread RxNewThreadScheduler-3 Receiver Thread RxNewThreadScheduler-3, Item length 3 Print Page Previous Next Advertisements ”;
RxJava – IO Scheduler
RxJava – IO Scheduler ”; Previous Next Schedulers.io() method creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. Best for I/O intensive operations. Schedulers.io() Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just(“A”, “AB”, “ABC”) .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println(“Processing Thread ” + Thread.currentThread().getName())) .subscribeOn(Schedulers.io())) .subscribe(length -> System.out.println(“Receiver Thread ” + Thread.currentThread().getName() + “, Item length ” + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 1 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 2 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 3 Print Page Previous Next Advertisements ”;
RxJava – From Scheduler
RxJava – From Scheduler ”; Previous Next Schedulers.from(Executor) method converts an Executor into a new Scheduler instance. Schedulers.from(Executor) Example Create the following Java program using any editor of your choice in, say, C:> RxJava. ObservableTester.java import java.util.Random; import java.util.concurrent.Executors; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just(“A”, “AB”, “ABC”) .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println(“Processing Thread ” + Thread.currentThread().getName())) .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))) .subscribe(length -> System.out.println(“Receiver Thread ” + Thread.currentThread().getName() + “, Item length ” + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } Verify the Result Compile the class using javac compiler as follows − C:RxJava>javac ObservableTester.java Now run the ObservableTester as follows − C:RxJava>java ObservableTester It should produce the following output − Processing Thread pool-1-thread-1 Processing Thread pool-3-thread-1 Receiver Thread pool-1-thread-1, Item length 1 Processing Thread pool-4-thread-1 Receiver Thread pool-4-thread-1, Item length 3 Receiver Thread pool-3-thread-1, Item length 2 Print Page Previous Next Advertisements ”;
RxJava – Quick Guide
RxJava – Quick Guide ”; Previous Next RxJava – Overview RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava. Extends the observer pattern. Support sequences of data/events. Provides operators to compose sequences together declaratively. Handles threading, synchronization, thread-safety and concurrent data structures internally. What is ReactiveX? ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers. As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern. The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. Functional Programming Functional programming revolves around building the software using pure functions. A pure function do not depends upon previous state and always returns the same result for the same parameters passed. Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments. Reactive Programming Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived. Functional Reactive Programming RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly. The Reactive Manifesto Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software − Responsive − Should always respond in a timely fashion. Message Driven − Should use asynchronous message-passing between components so that they maintain loose coupling. Elastic − Should stay responsive even under high load. Resilient − Should stay responsive even if any component(s) fail. Key components of RxJava RxJava have two key components: Observables and Observer. Observable − It represents an object similar to Stream which can emit zero or more data, can send error message, whose speed can be controlled while emitting a set of data, can send finite as well as infinite data. Observer − It subscribes to Observable”s data of sequence and reacts per item of the observables. Observers are notified whenever Observable emits a data. An Observer handles data one by one. An observer is never notified if items are not present or a callback is not returned for a previous item. RxJava – Environment Setup Local Environment Setup RxJava is a library for Java, so the very first requirement is to have JDK installed in your machine. System Requirement JDK 1.5 or above. Memory No minimum requirement. Disk Space No minimum requirement. Operating System No minimum requirement. Step 1 – Verify Java Installation in Your Machine First of all, open the console and execute a java command based on the operating system you are working on. OS Task Command Windows Open Command Console c:> java -version Linux Open Command Terminal $ java -version Mac Open Terminal machine:< joseph$ java -version Let”s verify the output for all the operating systems − OS Output Windows java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) Linux java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) Mac java version “1.8.0_101” Java(TM) SE Runtime Environment (build 1.8.0_101) If you do not have Java installed on your system, then download the Java Software Development Kit (SDK) from the following link https://www.oracle.com. We are assuming Java 1.8.0_101 as the installed version for this tutorial. Step 2 – Set JAVA Environment Set the JAVA_HOME environment variable to point to the base directory location where Java is installed on your machine. For example. OS Output Windows Set the environment variable JAVA_HOME to C:Program FilesJavajdk1.8.0_101 Linux export JAVA_HOME = /usr/local/java-current Mac export JAVA_HOME = /Library/Java/Home Append Java compiler location to the System Path. OS Output Windows Append the string C:Program FilesJavajdk1.8.0_101bin at the end of the system variable, Path. Linux export PATH = $PATH:$JAVA_HOME/bin/ Mac not required Verify Java installation using the command java -version as explained above. Step 3 – Download RxJava2 Archive Download the latest version of RxJava jar file from RxJava @ MVNRepository and its dependency Reactive Streams @ MVNRepository . At the time of writing this tutorial, we have downloaded rxjava-2.2.4.jar, reactive-streams-1.0.2.jar and copied it into C:>RxJava folder. OS Archive name Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar Step 4 – Set RxJava Environment Set the RX_JAVA environment variable to point to the base directory location where RxJava jar is stored on your machine. Let’s assuming we”ve stored rxjava-2.2.4.jar and reactive-streams-1.0.2.jar in the RxJava folder. Sr.No OS & Description 1 Windows Set the environment variable RX_JAVA to C:RxJava 2 Linux export RX_JAVA = /usr/local/RxJava 3 Mac export RX_JAVA = /Library/RxJava Step 5 – Set CLASSPATH Variable Set the CLASSPATH environment variable to point to the RxJava jar location. Sr.No OS & Description 1 Windows Set the environment variable CLASSPATH to %CLASSPATH%;%RX_JAVA%rxjava-2.2.4.jar;%RX_JAVA%reactive-streams-1.0.2.jar;.; 2 Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. 3 Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. Step 6 – Test RxJava Setup Create a class TestRx.java as shown below − import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just(“Hello World!”) .subscribe(System.out::println); } } Step 7 – Verify the Result Compile the classes using javac compiler as follows − C:RxJava>javac Tester.java Verify the output. Hello World! RxJava – How Observable works Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items. Observable Observable provides data once subscriber starts listening. Observable can emit any number of items. Observable can emit only signal of completion as well with no item. Observable can terminate successfully. Observable may never terminate. e.g. a button can be clicked any number of times. Observable may throw error at any point of time. Subscriber Observable can have multiple subscribers. When an Observable emits an item, each subscriber onNext() method