Request & Response
Since every value can be a stream in Rx, a url can also be a stream, so we can create a stream that will capture everytime a url is requested by doing:
var requestStream = Rx.Observable.just('https://api.github.com/users');
The url is now an observable stream, and each time this value is emitted we need to invoke some functionality, which we do by subscribing to the value.
requestStream.subscribe(function(requestUrl) { //see below
But, as of now we can only subscribe to the url value being invoked, we want to subscribe also to the response. So well need to upgrade the insides of hour observer function:
requestStream.subscribe(function(requestUrl) {
// 1. requestStream.subscribe emits the url value, and enters into the observer function.
// 2. inside the observer function we create another new Observable but this time passing it a function as value (Rx.Observable.create)
// 3. inside the function value of the new Observable we make a server request using a promise, passing it the value emitted to us from step 1.
// 4. inside the promises callbacks (done,fail,always) we notify the observer on the corresponding server response.
// 5. after defining our new Observable, we subscribe to it, which will emit the function value to execute, doing a promise based server call, then emiting
// the corresponding server response into the subscriptions callback, where we can complete our procedure as needed.
var responseStream = Rx.Observable.create(function (observer) {
jQuery.getJSON(requestUrl)
.done(function(response) { observer.onNext(response); })
.fail(function(jqXHR, status, error) { observer.onError(error); })
.always(function() { observer.onCompleted(); });
});
responseStream.subscribe(function(response) {
// do something with the response
});
}
We create another observable stream, that will invoke a value, this time not a url string but a function.
The invoked function will do an XHR using a jQuery promise that will resolve in ether .done() or .fail()
Done – observer.onNext notify other subscribers on the data events
Fail – notify subscribers on errors
onCompleted – notifies an end result
We can see that the promise inside our stream manages the flow of the event, resolving it by notifying other subscribers about returned value, errors and the end of the event.
Is a promise an observable ?
Yes! A promise can be viewed as a single value stream, meaning you can emit a single value (the promise) which will be resolved by a subscriber (the .then()/.done() function) the difference is that a stream enables continuous values to be emited unlike a promise which resolves then waits to be subscribed to again.
The two do not conflict and can be used together, and you can convert a promise to a stream using Rx.Observable.fromPromise(promise)
Meta stream
As of now the responseStream is depandant on the nested Promise and its callbacks, then it is also dependent on the nested subscriptions, and its callback, this is equivalent to "callback hell".
What we need to do is map each responseStream using a map function to return a new stream (Observable) from a promise (jQuery.getJSON).
The result:
var responseMetastream = requestStream
.map(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
now responseStream (responseMetastream ) is a stream of streams, returning an observable stream for each new request made to the server.