Understanding observeOn() and subscribeOn()

Understanding observeOn() and subscribeOn():

A few people have asked if I could cover observeOn() and subscribeOn() in a similar manner that I have covered parallelization. Let's take a hand-on approach to understand observeOn() and subscribeOn(). But first here is a little theory for newcomers to multithreading.

Concurrency and Multithreading in a Nutshell

If you have sizable experience with Java concurrency, please skip this section. Otherwise read on!
If you have never done multithreading/concurrency before, the idea is essentially multitasking. Think of a thread as a cursor executing one line of code at a time, which you can visibly see when you are using breakpoints in debug mode with Eclipse or IDEA. As you step through your code, each statement is executed top-to-bottom. In effect, you slowed down a thread and are now in control of it. You only have one thread traversing your code and executing each statement.
But when you multithread, you can have two or more threads (cursors) traversing your code and executing statements. For instance, you can have three threads doing three different tasks. Two threads could be importing two different sets of data simultaneously, while the third thread is asking the user for login credentials. These three tasks are being executing at the same time, and this is much better than having the user wait for each data set to be loaded before being prompted with a login.
However, when these three threads enter the same object and manipulate its properties, there is a high risk of the cursors overlapping. They can start to race each other and compete chaotically to evaluate and change the object's properties. That is why immutability should be your default policy with properties, and when properties have to be mutable you use synchronization. While you should always strive for immutability, RxJava greatly reduces the likelihood of race conditions and other multithreading problems. Problems will only likely happen when you create side effects manipulating objects outside the Observable chain.
Another common use of multithreading is parallelization. Say you have 10,000 objects and you need to perform an expensive calculation on each one. Instead of iterating them and executing the process one at a time, you can process 5 at at time by passing them to 5 worker threads. This could make the process up to 5 times faster.
Think of this as a checkout line where having 5 cashiers is better than 1, because you can process a high volume of customers faster. But of course, like threads, cashiers are expensive. If you have 30 customers to process, it is probably not practical to have 30 cashiers due to operational constraints. It is better to have 5 cashiers and "re-use" them after they process each customer, and they can take another customer waiting in the queue. This is effectively what a thread pool does. It internally maintains a set of threads and will queue tasks to delegate to them.
Most computers nowadays have multiple "cores", or processors built into the CPU. If you have a quad-core, you can optimally support 5 computational threads (4 cores + 1 extra thread for idle time). If you have 8 cores, you can optimally support 9 threads, and so on. If you exceed this simple rough formula (e.g. running 6 threads or more on a 4-core machine) you risk compromising performance. But not every task is computational. Importing and exporting data (called IO tasks) is much less taxing on the CPU. You could theoretically have 10 threads on a 4-core machine without issue if they all are simply importing/exporting data.
While RxJava takes a lot of the pain out of concurrency and multithreading, I highly recommend knowing how to use concurrency without RxJava, just so you are aware of the "gotchas" multithreading can manifest. Benjamin Winterberg created an awesome tutorial on Java 8 concurrency which I recommend reading. If you want some deep knowledge on concurrency, check out Java: Concurrency in Practice by Brian Goetz.

subscribeOn()

Before we bring concurrency into the discussion, think long and hard how an Observable chain of operators actually works. You have to have a source Observable where the emissions originate from. Only one emission at a time can be pushed up the entire chain of operators to the Subscriber. By default, the thread that declares the subscription (the subscribe() method) is the thread that pushes these emissions from the source, one a a time, all the way to theSubscriber
For example, take this simple Observable operation that emits three String objects and maps their lengths.
public static void main(String[] args) {

Observable<String> source = Observable.just("Alpha","Beta","Gamma");

Observable<Integer> lengths = source.map(String::length);

lengths
.subscribe(l -> System.out.println("Received " + l +
" on thread " + Thread.currentThread().getName()));
}
The subscribe() operation on the second line will receive the emissions and print them. By default, the thread that declares the subscribe() is the one that pushes items from the source all the way up the chain to the Subscriber. If you run this program you will see the following output, indicating this Observable emitted items on the main thread.
Received 5 on thread main
Received 4 on thread main
Received 5 on thread main
This means the main thread (the thread which started the program) executed the emissions of this Observable, and it pushed each emission through the map() operator to the Subscriber. Since the main thread becomes occupied with pushing the emissions, the program will not exit until the Observable is done pushing emissions and callsonCompleted().
Let's say we wanted to subscribe to this Observable but we do not want to do it on the current main thread. Pretend calculating the lengths takes awhile. Perhaps we would like to kick off the calculations but not hold up the main thread. That main thread has places to go, things to do, and needs to kick off other tasks. We can do that with thesubscribeOn() and specify a Scheduler. This will emit the items from the source on a different thread.
If our task is computational, we should use Schedulers.computation(). This will allocate one of a few fixed number of threads to this Observable operation, and the source will emit items on that thread.
 public static void main(String[] args) {

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

Observable<Integer> lengths = source
.subscribeOn(Schedulers.computation())
.map(String::length);

lengths
.subscribe(sum -> System.out.println("Received " + sum +
" on thread " + Thread.currentThread().getName()));
}
But you may run into a problem and not get any output. Why? With our simple program, the main thread passed off the execution of our Observable chain to a computation thread. The main thread reached the end of the main()method and exited the program, before the computation thread got a chance to emit any items!
You will not likely encounter this with real programs that are kept alive for a session, but for our example we need to keep our main thread alive long enough to see the subscription work. Just make it sleep for three seconds and that should give plenty of time to subscribe and execute the emissions.
public static void main(String[] args) {

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");

Observable<Integer> lengths = source
.subscribeOn(Schedulers.computation())
.map(String::length);

sumLength
.subscribe(sum -> System.out.println("Received " + sum +
" on thread " + Thread.currentThread().getName()));

sleep
(3000);
}

private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e
.printStackTrace();
}
}
Your output should now look like this.
Received 5 on thread RxComputationThreadPool-1
Received 4 on thread RxComputationThreadPool-1
Received 5 on thread RxComputationThreadPool-1
Note that the emissions happened on a computation thread, or more specifically a thread namedRxComputationThreadPool-1. This thread emitted all these items. A common misconception is that multiple threads will automatically parallelize your emissions, but this is not true as it would break the Observable contractYou can only direct emissions of an Observable from one single thread to another single thread. Parallelization is only possible when you create separate Observables as shown here.
It does not matter where in your Observable chain to put the subscribeOn(). No matter where you put it, it will tell the source Observable which thread to emit items on. If you specify multiple subscribeOn() operators, the one closes to the source (the left-most), will be the one used. As a matter of fact, a few source Observable factories, likeObservable.interval(), will already specify a subscribeOn() internally. Observable.interval() will already emit on the computation scheduler, and any subscribeOn() you specify on it will do nothing.
In summary, subscribeOn() instructs the source Observable which thread to emit items on, and this thread will push items all the way to the Subscriber. However, if it encounters an observeOn() somewhere in the chain (discussed shortly), it will then pass emissions to another thread for the remaining operations at that point.

Choosing a Scheduler

There are several other Schedulers such as Schedulers.io(), which is optimal for IO-related tasks (and it caches and re-uses threads to increase efficiency). Then there is Schedulers.newThread() which simply creates a new thread for each subscription. You have to be careful with both of these because in theory they could create an unlimited number of threads (this can cause bad performance). For computational tasks, you should useSchedulers.computation() so the number of threads are limited based on the number of cores your machine has.
You can also use Schedulers.from() to specify your own Executor. Especially for parallization, I found this approach to have better performance.

observeOn()

It is helpful to instruct a source Observable which Scheduler to use via subscribeOn(), and the sourceObservable will emit items on one of that Scheduler's threads. However, it is often helpful in the middle of anObservable chain to switch to another Scheduler. For example, you may press a button on a UI and it kicks off work on a computation thread, which frees up the UI thread so the UI does not freeze. But when the computation is done, it needs to be displayed back on the UI. Oftentimes, when you working with UI technologies like JavaFX, Swing, or Android, you have to update the UI on the Event Dispatch Thread.
Take this example. We emit the numbers 1 through 10 and do some simple multiplication to them. By default the emissions happen on the main thread since we do not specify a subscribeOn(). But before the map(i -> i * 10)operation we switch the emissions over to a computation thread.
 public static void main(String[] args) {

Observable<Integer> source = Observable.range(1,10);

source
.map(i -> i * 100)
.doOnNext(i -> System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName()));

sleep
(3000);
}

private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e
.printStackTrace();
}
}
If you run this code you should get this output.
Emitting 100 on thread main
Emitting 200 on thread main
Emitting 300 on thread main
Emitting 400 on thread main
Emitting 500 on thread main
Emitting 600 on thread main
Emitting 700 on thread main
Emitting 800 on thread main
Emitting 900 on thread main
Emitting 1000 on thread main
Received 1000 on thread RxComputationThreadPool-3
Received 2000 on thread RxComputationThreadPool-3
Received 3000 on thread RxComputationThreadPool-3
Received 4000 on thread RxComputationThreadPool-3
Received 5000 on thread RxComputationThreadPool-3
Received 6000 on thread RxComputationThreadPool-3
Received 7000 on thread RxComputationThreadPool-3
Received 8000 on thread RxComputationThreadPool-3
Received 9000 on thread RxComputationThreadPool-3
Received 10000 on thread RxComputationThreadPool-3
You will see the emissions initially occurred on the main thread and were pushed on that thread all the way to the firstmap(). But after that the observeOn() redirected the emissions to a computation thread, which pushed the emissions to the second map() and the final Subscriber.
Still not clear? Let's look at this visually. No matter what Scheduler you are subscribed on, only one emission is allowed to travel up the Observable chain of operators at a time. Below, you can observe that the emission must be pushed all the way from the source to the Subscriber before the next emission can start. 

Let's say we wanted to switch to another thread after Operator 2. Perhaps we finished calculating something and now we want to update the UI on the UI thread. Or maybe we finished importing a data set on the io() Scheduler and now we want to do computations on the computation() Scheduler.
You can do this with the observeOn() operator as shown below. Notice how the bottom stream passed an emission to the top stream, allowing the bottom stream to start the next emission without waiting for the current one to reach the Subscriber

The bottom stream represents a stream of operators on one Scheduler, and the top one represents another. Once an emission is passed from the bottom stream to the top one, the bottom stream is no longer concerned with it. It is now the top stream's responsibility to get that emission to the Subscriber.
One problem that may arise with observeOn() is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure you may have to consider.
Effectively, you can only use one subscribeOn(), but you can have any number of observeOn() operators. You can switch emissions from one thread to another with ease using observeOn(). But do not use it everywhere for the sake of. Only use it when you find a calculation is intense enough that it needs to be offloaded to another thread.
For UI technologies, there are a couple of libraries that bridge RxJava with a UI Scheduler. For example, there is RxJavaFX which has a Scheduler that puts emissions on the JavaFX Platform thread. There is also the RxJava Android Module which has Schedulers for Android. There is even RxSwing for those of us stuck with legacy Swing applications. These are very helpful to use in conjunction with observeOn() when working with user interfaces.
Understanding observeOn() and subscribeOn() Understanding observeOn() and subscribeOn() Reviewed by Anonymous on February 14, 2016 Rating: 5

No comments:

Java Ternary Operator

Java Ternary Operator Java ternary operator is the only conditional operator that takes three operands. Java ternary operator is a one l...

Powered by Blogger.