This project was generated with Angular CLI version 13.3.0.
Rx stands for Reactive Programming which refers to Programming with asynchronous data streams.
A stream is a data which arrives over a period of time. The stream of data can be anything like variables, user inputs, cache, data structures, etc
At any point, data stream may emit any of the following three events:
- Value : The next value in the stream.
- Error : The stream has ended.
- Complete : The error has stopped the stream or data stream is over.
The Reactive Programming is all about creating streams, emitting values, error or complete notifications, manipulate and transform data.
The Rxjs (Reactive Extensions Library for Javascript) is a Javascript library, that allows us to work with Asynchronous data streams.
Angular uses RXJS library heavily in its framework to implement Reactive Programming. For example,
- Reacting to an
HTTPRequest in Angular, i.e. by subscribing. - Value Changes / Status Changes in Reactive forms.
- Custom events that send Observable output data from child component to a parent component
Observables: is a function that emits single or multiple values over time either synchronously or asynchronously.
Observers: Observables on its own is useless unless someone consumes the value emitted by the Observable. We call them
Observers or Subscribers.
-
Simplest way to create Observable is using the Observable
constructor. The Observableconstructortakes an argument for its callback function (subscriber - argument).This callback function will run when thisObservable's subscribemethod executes. -
next(): The observable invokes thenext()callback whenever a value arrives in the stream. It passes the value as anargumentto thiscallback. -
error(): Sends JS Error / Exception as argument. No further value is emitted. Stream stops. -
complete(): Observable invokes this when stream completes. After emitting the complete() notification, no value is emitted to the subscriber after that.
Syntax : let obs = new Observable(subscriber => {
console.log('Start emitting');
subscriber.next('Hi')
});
There are easier ways to create Observables using Rxjs Operators.
The Observers communicate with the Observable using callbacks. While subscribing to the Observable, it passes
three optional callbacks. We can pass these callbacks within an Object as an argument for subscribe() method. If we are expecting only the value emitted by the Observable, then it can be passed without the Object syntax.
Check app.component.ts for implementation of subscribing to Observables.
The Operators are functions that operate on the Observables and return a new Observable. We can manipulate incoming observable, filter it,
merge it with another Observable or subscribe to another Observable.
We can also chain each operator one after the other using the pipe. Each operator in the chain gets the Observable from the previous operator. It modifies it and creates new Observable, which becomes the input for next Operator.
The following table lists some of the commonly used Operators.
| Operation | Operators |
|---|---|
| Combination | combineLatest, concat, merge, startWith , withLatestFrom, zip |
| Filtering | debounceTime, distinctUntilChanged, filter, take, takeUntil, takeWhile, takeLast, first, last, single, skip, skipUntil, skipWhile, skipLast |
| Transformation | bufferTime, concatMap, map, mergeMap, scan, switchMap, ExhaustMap, reduce |
| Utility | tap, delay, delaywhen |
| Error Handling | throwerror, catcherror, retry, retrywhen |
| Multicasting | share |
-
Observable.create()-> Calls the ObservableConstructorbehind the scenes. Create is a method of the Observable object, hence don't have to import it. This method is deprecated. Use constructor instead. -
ofcreates an Observable from theargumentswe pass into it. We can pass any number of arguments to theOf. Each argument is emitted one after the other. It sends thecompletesignal in the end. -
fromoperates creates takes onlyone argumentthat can be iterated and converted into anObservable. Sendscompletesignal in the end.
Example Array: from([a,b,c]) => a->b->c->complete Example String: from('Hello') => 'h'->'e'->'l'->'l'->'o' -> complete.
Observables from collections: Anything that can beiteratedcan be converted into anObservableusingfromoperator.
-
FromEventmethod allows us to create an Observable fromDOM eventsdirectly. -
Arguments: EventTargetElement: First, EventName: Second
Syntax :
fromEvent(this.button.nativeElement, 'click').subscribe({next: () => {}, complete: () => {}})
When we subscribe to an observable, which we created using the fromEvent method, it registers the event handler using the addEventListener in the DOM element
Pipe method of Angular Observable is used to chain multiple operators together. Rxjs Operators are functions that take Observables as Input and transform it into a new Observable and return it.
Each argument of the pipe method must be separated by a comma. The order of operators is important because when a user subscribes to an Observable, the pipe executes in the order in which they are added.
There are 2 ways, we can use the pipe. One as an instance of Observable and the other way is to use it as a standalone method.
We chain the operators op1, op2 etc that are passed as argument to the pipe method. The output of op1 becomes the Input of op2.
obs.pipe(
op1(),
op2(),
op3(),
)
Note: If we are emitting multiple values through operators in the pipe chain, each observable would go through the entire chain and will be delivered to the subscriber, only then the next one will be streamed.
Refer pipeOperatorsUsingFilterMap() method.
We can also use pipe as a standalone function and re-use the pipe at other places. We need to import pipe from rxjs. Check reusablePipe method for custom pipe creation.
tap : The tap operator returns a new Observable which is the mirror copy of the source observable. Mostly used for debugging purpose. It does not modify the stream in any way.
Example: Logging the values of Observables. Refer tapObservables() method
map : can be used with HTTP Request, with DOM Events, filtering the input data etc..
Argumentsmap(value : emitted by the observable ,index: 0 for the first value emitted and incremented by one for every subsequent value) optional.
Note : keyValue pipe from @angular/common can transform an Object to Array of key-value pairs
const obj = {person1: 'jon',person2: 'hopper',person3: 'mona'}
const transformObj = this.keyValuePipe.transform(obj);
Result : [ { "key": "person1", "value": "jon" }, { "key": "person2", "value": "hopper" }, { "key": "person3", "value": "mona" } ]
- We can also use
multiple mapswithin samepipe.
-Most widely used operator which can filter items emitted based on a condition.
SwitchMap operator maps each value from the source observable to an inner observable. The source observable subscribes to the inner observable and emits value from it.
SwitchMap function must return an Observable
map emits values as Observables, switchMap subscribes to an Inner Observable and emits values from it.
someStream$.pipe(
switchMap(args => makeApiCall(args)), // must return a stream
map(response => process(response)) // returns a value of any shape, usually an object or a primitive
).subscribe(doSomethingWithResults);
Example use case: This works perfectly for scenarios like form Input/search Input where you are no longer concerned with the response of the previous request when a new input arrives.
The main difference between switchMap and other flattening operators is the cancelling effect. On each emission the previous inner observable (the result of the function you supplied) is cancelled and the new observable is subscribed. You can remember this by the phrase switch to a new observable.
This operator is best used when you wish to flatten an inner observable but want to manually control the number of inner subscriptions.
In contrast to SwitchMap, mergeMap allows for multiple inner subscriptions to be active at a time.
If the order of emission and subscription of inner observables is important, try concatMap. SwitchMap never cancels inner Observable.
Memory Leaks : Using mergeMap operator can often lead to memory leaks since it allows multiple inner subscriptions, so make sure to use Operators like take, takeUntil
Why use take?
- When you are interested in only the
first emission, you want to use take. Maybe you want to see what the user first clicked on when they entered the page, or you would want to subscribe to the click event and just take the first emission.
-
Another use-case is when you need to take a snapshot of data at a particular point in time but do not require further emissions. For example, a stream of user token updates, or a route guard based on a stream in an Angular application.
💡 If you want to take a number of values based on some logic, or another observable, you can use takeUntil or takeWhile! 💡
takeis the opposite ofskipwhere take will take the first n number of emissions while skip will skip the first n number of emissions.
obs.pipe(take(2)).subscribe()
The takeUntil operator returns an Observable that emits value from the source Observable until the notifier Observable emits a value.
TakeUntil(notifier: Observable): Observable
We must pass a notifier observable as the argument to the TakeUntil Operator.
-
TakeUntil emits the values from the Source Observable as long as it does not receive any value from the notifier observable.
-
When the notifier emits a value, the TakeUntil completes the Source observable.
Check sample code in transform.component.ts
TakeWhile operator will keep emitting the value from the source observable until they pass the given condition (predicate). When it receives a value that does not satisfy the condition it completes the observable.
The difference is that takeWhile discards the rest of the stream, when it receives the first value that does not satisfy the condition. The filter operator never stops the observable.
TakeLast operator emits the last n number of values from the source observable.
first/last operator emits the first/last matching value if the condition is present.If there is no condition present, it emits th first/last value it receives.
Error notification is sent if no value is emitted from source.
The skip operators skips the values from the source observable based on a condition. The Skip, SkipUntil, SkipWhile skips the values from the start of the source. The SkipLast Operator skips elements from the end of the source.
Filteremits the value if the predicate(condn) is trueSkipWhileskips the value if the predicate(condn) is true
Subjects are special Observable which acts as both Observer and Observable. They allow us to emit new values to the Observable stream using the next method.
- All the
subscribers, who subscribe to thesubjectwill receive the same instance of the subject & hence the same values.
- A
Subjectis a special type ofObservablewhich allows values to bemulti-castedto manyobservers.
Subject implements both subscribe method and next, error and complete.
subject$ = new Subject();
ngOnInit() {
this.subject$.subscribe(val => console.log(value))
this.subject$.next(1);
this.subject$.next(2);
this.subject$.complete();
}
Observables are classified into two groups.
- Cold Observable
- Hot Observable
The cold observable does not activate the producer until there is a subscriber. The producer emits the value only when a subscriber subscribes to it.
The Hot observable does not wait for a subscriber to emit the data. It can start emitting the values right away.
subject$ = new Subject();
ngOnInit() {
subject$.next(1);
subject$.next(2);
subject$.complete();
}
In the above example, since there were no subscribers, no one receives the data but that did not stop the subject from emitting data.
Now consider the following example. Here the subjects that emits the values 1 & 2 are lost because subscription happens after they emit values.
ngOnInit() {
subject$.next(1);
subject$.next(2);
subject$.subscribe(val => console.log(val));
subject$.next(3);
subject$.next(4);
subject$.complete();
}
Observer needs to implement next, error, complete callback (all optional) to become an Observer.
let obs$ = new Observable(observer => {
observer.next(1);
observer.error('error');
})
this.subject$.subscribe(val => {
console.log(val);
});
obs$.subscribe(subject$);
Since the subject$ implements next method, it receives the value from observable and emits them to subscribers. So we can subscribe to observable and use subject$ as observer.
Another important distinction between observable and subject is that Subjects are multi cast.
- More than one subscriber can subscribe to a Subject. They will
sharethe same instance of the observable. All subscribers will receive the same event when the Subject emits it.
- Multiple
observersof anobservablewill receive a separate instance of theobservable.
Check uniCastVsMultiCast method in subject.component.ts.
Whenever subscriber subscribes to a Subject, it will add it to an array of Subscribers. This way Subject keeps track of all the subscribers and emits the event to all of them.
- BehaviorSubject
- ReplaySubject
- AsyncSubject
BehaviorSubject requires an initial value and stores the current value and emits it to the new subscribers.
subject$ = new BehaviorSubject(0);
subject$.subscribe(val => console.log(val)); //0
subject$.next(1);
BehaviorSubject will always remembers the last emitted value ans shares it with new subscribers.
ReplaySubject replays old values to new Subscribers when they first subscribe.
- The
ReplaySubjectwill store every value it emits in a buffer. We can configure thebuffer argumentsusing thebufferSizeandwindowTime.
bufferSize : No. of items that ReplaySubject will keep in its buffer. It defaults to infinity.
windowTime : The amount of time to keep the value in the buffer.
Even when subscription happens after the values are emitted, ReplaySubject stores the values in a buffer.
AsyncSubject only emits the latest value when it completes. If it errors out, then it will emit an error, but will not emit anymore values.
Check asyncSubjectDemo method in subject.component.ts
The Scan & Reduce Operators in Angular applies an accumulator function on the values of the source observable. The Scan Operator returns all intermediate results of the accumulation, while Reduce only emits the last result. Both also use an optional seed value as the initial value.
Both emit values from the source observable, only after a certain amount of time has elapsed since the last value. Both emit only the latest value and discard any intermediate values.
The typeahead/autocomplete fields are one of the most common use cases for Debounce Operators.
-
As the user types in the typeahead field, we need to listen to it and send an HTTP request to the back end to get a list of possible values. If we send HTTP requests for every keystroke, we end up making numerous calls to the server.
-
By using the
DebounceOperators, we wait until the user pauses typing before sending an HTTP Request. This will eliminates unnecessary HTTP requests.