RxPY – Working with Observables

RxPY – Working With Observables ”; Previous Next An observable, is a function that creates an observer and attaches it to the source where values are expected, for example, clicks, mouse events from a dom element, etc. The topics mentioned below will be studied in detail in this chapter. Create Observables Subscribe and Execute an Observable Create observables To create an observable we will use create() method and pass the function to it that has the following items. on_next() − This function gets called when the Observable emits an item. on_completed() − This function gets called when the Observable is complete. on_error() − This function gets called when an error occurs on the Observable. To work with create() method first import the method as shown below − from rx import create Here is a working example, to create an observable − testrx.py from rx import create deftest_observable(observer, scheduler): observer.on_next(“Hello”) observer.on_error(“Error”) observer.on_completed() source = create(test_observable). Subscribe and Execute an Observable To subscribe to an observable, we need to use subscribe() function and pass the callback function on_next, on_error and on_completed. Here is a working example − testrx.py from rx import create deftest_observable(observer, scheduler): observer.on_next(“Hello”) observer.on_completed() source = create(test_observable) source.subscribe( on_next = lambda i: print(“Got – {0}”.format(i)), on_error = lambda e: print(“Error : {0}”.format(e)), on_completed = lambda: print(“Job Done!”), ) The subscribe() method takes care of executing the observable. The callback function on_next, on_error and on_completed has to be passed to the subscribe method. Call to subscribe method, in turn, executes the test_observable() function. It is not mandatory to pass all three callback functions to the subscribe() method. You can pass as per your requirements the on_next(), on_error() and on_completed(). The lambda function is used for on_next, on_error and on_completed. It will take in the arguments and execute the expression given. Here is the output, of the observable created − E:pyrx>python testrx.py Got – Hello Job Done! Print Page Previous Next Advertisements ”;

RxPY – Concurrency Using Scheduler

RxPY – Concurrency using Scheduler ”; Previous Next One important feature of RxPy is concurrency, i.e. to allow the task to execute in parallel. To make that happen, we have two operators subscribe_on() and observe_on() that will work with a scheduler, that will decide the execution of the subscribed task. Here, is a working example, that shows the need for subscibe_on(), observe_on() and scheduler. Example import random import time import rx from rx import operators as ops def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print(“From Task 1: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 1 complete”) ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print(“From Task 2: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 2 complete”) ) input(“Press any key to exitn”) In the above example, I have 2 tasks: Task 1 and Task 2. The execution of the task is in sequence. The second task starts only, when the first task is done. Output E:pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete RxPy supports many Scheduler, and here, we are going to make use of ThreadPoolScheduler. ThreadPoolScheduler mainly will try to manage with the CPU threads available. In the example, we have seen earlier, we are going to make use of a multiprocessing module that will give us the cpu_count. The count will be given to the ThreadPoolScheduler that will manage to get the task working in parallel based on the threads available. Here, is a working example − import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print(“Cpu count is : {0}”.format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 1: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 1 complete”) ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 2: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 2 complete”) ) input(“Press any key to exitn”) In the above example, I have 2 tasks and the cpu_count is 4. Since, the task is 2 and threads available with us are 4, both the task can start in parallel. Output E:pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete If you see the output, both the task has started in parallel. Now, consider a scenario, where the task is more than the CPU count i.e. CPU count is 4 and tasks are 5. In this case, we would need to check if any thread has got free after task completion, so that, it can be assigned to the new task available in the queue. For this purpose, we can use the observe_on() operator which will observe the scheduler if any threads are free. Here, is a working example using observe_on() Example import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print(“Cpu count is : {0}”.format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 1: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 1 complete”) ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 2: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 2 complete”) ) #Task 3 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 3: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 3 complete”) ) #Task 4 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 4: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 4 complete”) ) #Task 5 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.observe_on(thread_pool_scheduler) ).subscribe( lambda s: print(“From Task 5: {0}”.format(s)), lambda e: print(e), lambda: print(“Task 5 complete”) ) input(“Press any key to exitn”) Output E:pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete If you see the output, the moment task 4 is complete, the thread is given to the next task i.e., task 5 and the same starts executing. Print Page Previous Next Advertisements ”;