-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Observable
In RxJava an object that implements the Observer interface subscribes to an object of the Observable class. Then that subscriber reacts to whatever item or sequence of items the Observable object emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Observable to emit objects, but instead it creates a sentry in the form of a subscriber that stands ready to react appropriately at whatever future time the Observable does so.
This page explains what the reactive pattern is and what Observables and observers are (and how observers subscribe to Observables). Subsequent child pages (as shown in sidebar) show how you use the variety of Observable operators to link Observables together and change their behaviors.
This documentation accompanies its explanations with "marble diagrams." Here is how marble diagrams represent Observables and transformations of Observables:
In many software programming tasks, you more or less expect that the instructions you write will execute and complete incrementally, one-at-a-time, in order as you have written them. But in the Rx paradigm, many instructions execute in parallel and their results are later captured, in arbitrary order, by “observers.” In these cases, rather than calling a method, you define a mechanism for retrieving and transforming the data, in the form of an “Observable,” and then subscribe an observer to it, at which point the previously-defined mechanism fires into action with the observer standing sentry to capture and respond to its emissions whenever they arrive.
An advantage of this approach is that when you have a bunch of tasks to do that are not dependent on each other, you can start them all at the same time rather than waiting for each one to finish before starting the next one --- that way, your entire bundle of tasks only takes as long to complete as the longest task in the bundle.
There are many terms used to describe this model of asynchronous programming and design. This document will use the following terms: A Subscriber (or sometimes Observer) subscribes to an object of the Observable class; that is, you subscribe a Subscriber to an Observable. An Observable emits items or sends notifications to its Subscribers by invoking the Subscribers’ methods.
In other documents and other contexts, what we are calling a “Subscriber” (or sometimes “Observer”) is sometimes called a “watcher” or “reactor.” This model in general is often referred to as the “reactor pattern”.
This document usually uses Groovy for code examples, but you can use RxJava in any JVM language — such as Clojure, Scala, or JRuby, or in Java itself.
In an ordinary method call — that is, not the sort of asynchronous, parallel calls typical in reactive programming — the flow is something like this:
- Call a method.
- Store the return value from that method in a variable.
- Use that variable and its new value to do something useful.
Or, something like this:
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
In the asynchronous model the flow goes more like this:
- Define a method that does something useful with the return value from the asynchronous call, this method is part of the Subscriber.
- Define the asynchronous call itself as an object of the Observable class.
- Attach the Subscriber to that Observable by subscribing it (this also initiates the method call).
- Go on with your business; whenever the call returns, the Subscriber’s method will begin to operate on its return value or values — the items emitted by the Observable.
Which looks something like this:
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the Subscriber is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
The subscribe()
method may accept one to three methods, or it may accept a Subscriber
object or any other object that implements the Observer
interface (which includes these three methods):
onNext: An Observable will call this method of its Subscribers whenever the Observable emits an item. This method takes as a parameter the item emitted by the Observable.
onError: An Observable will call this method of its Subscribers to indicate that it has failed to generate the expected data or has encountered some other error. This stops the Observable and it will not make further calls to onNext
or onCompleted
. The onError
method takes as its parameter the Throwable
that caused the error (or a CompositeException
in those cases where there may have been multiple errors).
onCompleted: An Observable will invoke this method of its Subscribers after it has called onNext
for the final time, if it has not encountered any errors.
A more complete subscribe()
example would therefore look like this:
def myOnNext = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
There is also an onStart()
method in the Subscriber
class in which you can put any code you want to be executed after the Subscriber
and Observable
have connected but before the Observable
has yet emitted any items (or sent any notifications) to the Subscriber
. This is a useful place to establish backpressure, which is discussed at another page on this wiki.
An object of the Subscriber
class also implements the unsubscribe()
method (this method is not part of the Observer
interface, so objects that implement that interface may or may not implement this method). You can call this method to indicate that the Subscriber is no longer interested in any of the Observables it is currently subscribed to. Those Observables can then (if they have no other interested subscribers) choose to stop generating new items to emit.
The results of this unsubscription will cascade back through the chain of operators that applies to the Observable that the Subscriber subscribed to, and this will cause each link in the chain to stop emitting items. This is not guaranteed to happen immediately, however, and it is possible for an Observable to generate and attempt to emit items for a while even after no Subscribers remain to observe these emissions.
The names of methods and classes in RxJava hew close to those in Microsoft’s Reactive Extensions. This has led to some confusion, as some of these names have different implications in other contexts, or seem awkward in the idiom of a particular implementing language.
For example there is the onEvent
naming pattern (e.g. onNext
, onCompleted
, onError
). In many contexts such names would indicate methods by means of which event handlers are registered. In the RxJava Subscriber context, however, they name the event handlers themselves.
When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.
In RxJava, there is also something called a “Connectable” Observable. Such an Observable does not begin emitting items until its connect( )
method is called, whether or not any observers have subscribed to it.
The Observable
/Subscriber
/Observer
classes along with onNext
/onError
/onCompleted
are only the start of RxJava. By themselves they’d be nothing more than a slight extension of the standard observer pattern, better suited to handling a sequence of events rather than a single callback.
The real power comes with the “reactive extensions” (hence “Rx”) — operators that allow you to transform, combine, manipulate, and work with the sequences of items emitted by Observables.
These Rx operators allow you to compose asynchronous sequences together in a declarative manner with all the efficiency benefits of callbacks but without the drawbacks of nesting callback handlers that are typically associated with asynchronous systems.
This documentation groups information about the various operators and examples of their usage into the following pages (these are also listed in the sidebar):
- Creating
- Transforming
- Filtering
- Combining
- Error Handling
- Utility
- Conditional and Boolean
- Mathematical and Aggregate
- Asynchronous Conversion
- Connectable Observables
- Blocking Observables
- String Observables
These pages include information about some operators that are not part of the core of RxJava but are implemented in a variety of contributed modules that you must include separately as part of your project.
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava