šŸ„ Subscribe data in Service using RxJS 5

NOTE: Using ngrx/store (Redux Store) and ngrx/effects (Redux Saga), the way will be different. Check the tutorial Reactive Programming (RxJS 5, Observable) and Redux (Store, Saga) in Angular2-Meteor.

Since RxJS 5 is built in Angular 2, we should take advantage of it.

This tutorial is about how to subscribe data in service using RxJS 5. To understand this tutorial, you need have very basic RxJS knowledge.

First letā€™s explain BehaviorSubjects a little bit.

BehaviorSubjects have all the functionality of Subject, but also

  1. output the last value received to all observers upon subscription. (The main reason I choose it)
  2. allow for an initial value to be set

Check here for more info about the difference among Subject, BehaviorSubject, ReplaySubject, AsyncSubject. When you use, you need pick the right one which fits your needs.

Real-time data: currentUser

Here is the user service:

user.service.ts

import { Injectable } from '@angular/core';
import { Meteor } from 'meteor/meteor';
import { MeteorComponent } from 'angular2-meteor';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

@Injectable()
export class UserService extends MeteorComponent {
  currentUser$ = new BehaviorSubject<User>(null);
  currentUserId: string;

  constructor() { super(); }
  
  // You can move this part into constructor(), but for me, to call it explicitly is more clear to me.
  // Also, next example you will see for some data, you only need subscribe when you need.
  subscribeCurrentUser() {
    this.subscribe('users.currentUser', () => {
      this.autorun(() => {
        this.currentUser$.next(<User>Meteor.user());    // <User> here is to convert the type Meteor.User to my custom interface User
        this.currentUserId = Meteor.userId();
      });
    });
  }
}

For this example, I use currentUserId a lot in my app, so I put a currentUserId there too. But you donā€™t have to.

We subscribe currentUser data only one time when the website starts, instead of subscribe and unsubscribe in each component which solves a big performance issue.

// app.ts

@Component({ ... })
class App implements OnInit {
  constructor (
    private _userService: UserService) {
    super();
  }
  ngOnInit() {
    this._userService.subscribeCurrentUser();
  }
}

When you need use currentUser information in the component:

// randam.component.ts

@Component({ ... })
export class RandomComponent implements OnInit, OnDestroy {
  currentUser: User;

  ngOnInit() {
    this.subscription = this._userService.currentUser$.subscribe(currentUser => {
      if (!currentUser) return;   // don't forget this, because you may subscribe the data before you got data from the server

      this.currentUser = currentUser;
      // do other things
    });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

If you are familiar with RxJS 5 Operators, you can change if (!currentUser) return; to .filter(x => !!x), like below:

    this.subscription = this._userService.currentUser$
   .filter(x => !!x)
   .subscribe(currentUser => {
      this.currentUser = currentUser;
      // do other things
    });

BehaviorSubjects also allows for an initial value. So sometimes you can give it an initial value, such as:

isOpen = new BehaviorSubject<boolean>(false);

Not real-time data

Sometimes, you donā€™t want real-time data or you donā€™t want to keep that data all the time. So subscribe it without autorun. But then you need unsubscribe it at the time you donā€™t want it manually to avoid wasting resources.
Also, the sample below shows how to subscribe with parameters (product code here) if you need.

import { Injectable } from '@angular/core';
import { Mongo } from 'meteor/mongo';
import { MeteorComponent } from 'angular2-meteor';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

import { Products } from '../collections/products';

@Injectable()
export class ProductsService extends MeteorComponent {
  products: Mongo.Cursor<Product>;
  products$ = new BehaviorSubject<Mongo.Cursor<Product>>(null);
  subscription: any;

  constructor() { super(); }
  
  subscribeProducts(code: string) {
    this.subscription = this.subscribe('products.private', code, () => {
      this.products = Products.find({ code: code });
      this.products$.next(this.products);

      // Or just: this.products$.next(Products.find({ code: code }));
    }, true);
  }

  unsubscribeProducts() {
    if (this.subscription) this.subscription.stop();
  }
}

Note

Some data is only needed in only one component, actually you can subscribe the data in that component.
However, we try to avoid that, instead, we subscribe in the service. The reason is because we want to use dependency injection as a pattern. It can help the code Testable and Reusable.

  1. Testable: When we test, we can switch to another service and inject fake data in that component.
  2. Reusable: In the future, for example, if we want to switch to Apollo to take advantage of GraphQL, we just need rewrite our service. But we can still use same component.

Read this article to have a deep understanding of Dependency Injection.

This is just how to use Service to subscribe data, but Service has many other usages, such as passing data between two components, which are not covered in this tutorial.

Feel free to comment and improve it.

6 Likes

Thanks for the wonderful article. You should really consider starting a blog or something.
I have a couple of questions.

  1. Iā€™d like to see the publication code for ā€˜users.currentUserā€™ in case there is something important there, just for the sake of the completeness of the example.

  2. In the code of random.component.ts you have a line

if (!currentUser) return;   // don't forget this, because you may subscribe the data before you got data from the server

Is this a general pattern of how to prevent component to initialize before subscription data becomes available from the service? Iā€™m asking because I had lots of trouble in one test project trying to accomplish this. So when you put this line, it keeps returning, until data becomes available, and then rerenders component when it does?

Thanks.

What makes this very powerful is the ability to instantiate the service when you load a component tree and access it from all the components in that tree. All the update, insert logic is in the service instead of spread over branch components.

I have three levels of encapsulation. Meteor methods to access the collection, ensuring consistency. Service on the client which provides an interface to the collection, then the components/templates which provide an interface for viewing and editing.

`import {ContactService} from './contact-service;
@Component ({
ā€¦
Providers:[ContactService]
})
export class RootComponent{}

@Component({
ā€¦
Template: <div *ngFor="let contact of _contactService.contacts"> <p>{{contact.name}}</p> </div>
})

export class BranchComponent{
constructor(_contactService:ContactService) {}
}`

For first question:

Meteor.publish('users.currentUser', function () {
  return Meteor.users.find(this.userId, { fields: { name: 1, emailHash: 1 }});	// based on your needs, publish the fields you need
});

For second question:

this.subscription = this._userService.currentUser$.subscribe(currentUser => {
  if (!currentUser) return;
  // if you currentUser$.subscribe before you got currentUser data from server, currentUser is null
  // then when you use currentUser here, you will have trouble
});

@Hongbo_Miao

Now I see that somehow my second question was cut when I submitted it and it is incomplete.
I asked if that was the pattern that can be generally used to ensure that the service has instantiated, i.e. that data from service is available to component model?
So, if on the first, second, third run, the data is unavailable from service, then the subscription returns null, but as soon as the data is available it will hydrate the model of the component, and the component wonā€™t crash. Is that true?

Yes, it is correct .

How do you install rxjs in your Meteor app? Iā€™m using meteor@1.1.14 with urigo:angular2-meteor 0.5.2. I canā€™t understand how you manage to make the following line work:

I have found a post from you on StackOverflow here but even with the comments, you seems to have had a miracle to make it work.
Do you know now how to make it happen? Is it a meteor 1.3 miracle?

Oh, at that time. I am still using Meteor 1.2. So I think at that time, the problem may related with that.

Now, RxJS 5 should be installed when you install Angular 2.

Make sure you have this line inside of your package.json.

{
  "dependencies": {
    "rxjs": "5.0.0-beta.7"
  }
}

Then just use this directly:

import { BehaviorSubject } from 'rxjs/BehaviorSubject';
1 Like

Thanks for the awesome article Hongbo. I was able to implement some basic rxJS into my Angular2 Meteor app.

I have a question around preserving the stream. Iā€™ve altered ā€œcomposedā€ the stream ā€œthis.currentUser$ā€ in user.service.ts by applying various rxJS operators such as merge, map, scan, filter, etcā€¦

this.currentUser$
  .merge(...)
  .scan(..)
etc..

If I subscript to the ā€œthis.currentUser$ā€ within user.service.ts the operations are applied as expected.

However the alterations made to the stream are not carried over into the ā€œthis.currentUser$ā€ stream returned to randam.component.ts. It appears the initially declared steam "currentUser$ = new BehaviorSubject(null); is returned instead?

Do you know of any way transferring the stream with operators from user.service.ts to randam.components.ts?

Thinking I could preserve the operators applied to the stream by creating some global streams but want to avoid that if possible?

Something to be aware of is that MeteorComponent uses promises.

I have been banging my head on the wall with this. I have a MeteorComponent that is Injectable(). I have a number of subsciptions that work just fine.

I set up an observable. In the constructor
ā€¦
this._contactId = new Subject();
this.contactid$ = this._contactId.asObservable();
ā€¦

Then a method
SetContactId(id: string) {
console.log("SetContactId "+id);
this._contactid.next(id);
ā€¦

Then in a template

<div  *ngIf="!(_ContactService.contactid$ | async)">
	<contact-new></contact-new>
</div>
<div style="flex:1 order:1" *ngIf="(_ContactService.contactid$ | async)">
	<contact-editor ></contact-editor>
</div>

If I call SetContactId from a click handler, it works as expected. But if I call it from a Meteor Method callback, it doesnā€™t work, the template wonā€™t update unless I tab out, click somewhere, etc. generating a digest cycle.

this.call('contacts.add',data,function (error,id) {
      self.SetContactId(id);
}

The meteor method function simply inserts a document in a collection returning the id. MeteorComponent.call returns a promise I think. In any case the template doesnā€™t work with this call, but does if SetContactId is called from a non MeteorComponent callback function.

Hi, @cesargalindo can you rephrase this part, I didnā€™t get it.

It is related with ngZone. Try this:

import { NgZone } from '@angular/core'

constructor(private _ngZone: NgZone) {}

this.call('contacts.add',data,function (error,id) {
    this._ngZone.run(() => {
      self.SetContactId(id);
    });
}

Thanks. That works perfectly.

Just to get this clear in my mind. The ngzone.run() tells angular2 that functions outside the angular environment has run with results that need updating.

Hi, @Hongbo_Miao,

apologies for the lack of clarity. Hereā€™s what Iā€™m trying to accomplish.

I have a service in allValidCards.ts which provides a BehaviorSubject stream allValidCards$ similar to how the "currentUser$ stream is provided by user.service.ts to randam.component.ts.

Within allValidCards.service.ts, the allValidCards$ is composed from three other services from three collections.

The three meteor collections are normalCards, freeCards, and premiumCards and and their corresponding BehaviorSubject streams are:

NormalCards$ = new BehaviorSubject<Mongo.Cursor<NormalCard>>(null);
FreeCards$ = new BehaviorSubject<Mongo.Cursor<FreeCard>>(null);
PremiumCards$ = new BehaviorSubject<Mongo.Cursor<PremiumCard>>(null);

Using rxJS Iā€™m composing the stream allValidCards$ as follows:

this.allValidCards$
         .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
         .scan( (acc, curr) => {
             if (acc != null) {
                 let tmp_stream =  curr.concat(acc);
                 return tmp_stream;
             }
             return curr;
         });

// then I initialize the streams with data:
this.normalCards = NormalCards.find({ checked: true }).fetch();
this.NormalCards$.next(this.normalCards);	

this.freeCards = FreeCards.find({ checked: true }).fetch();
this.FreeCards$.next(this.freeCards);

this.premiumCards = PremiumCards.find({ checked: true }).fetch();
this.PremiumCards$.next(this.premiumCards);

If I .subscribe to this.allValidCards$ within allValidCards.service.ts, the operators are applied as expected. All values from all streams are merged into the allValidCards$

However, when ā€œsubscribing to the streamā€ in another component such as randam.components.ts, none of the applied operators are preserved. Iā€™m only able to push information onto the stream using this.allValidCards$.next(this.premiumCards);

This part in Service, right? Where do you put this part?

this.allValidCards$
     .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
     .scan( (acc, curr) => {
         if (acc != null) {
             let tmp_stream =  curr.concat(acc);
             return tmp_stream;
         }
         return curr;
     });

I updated a little the tutorial, you can use RxJS Operators if you are familiar

.filter(x => !!x) instead of if (!currentUser) return;, which is cleaner,

1 Like

Yes, itā€™s in service. I placed it within a subscribeCards() {ā€¦ } function similar to subscribeCurrentUser() { ā€¦ }

subscribeCards() {

    this.subscription = this.subscribe('cards.all', () => {

        this.allValidCards$
            .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
            .scan( (acc, curr) => {
                if (acc != null) {
                    let tmp_stream =  curr.concat(acc);
                    return tmp_stream;
                }
                return curr;
            });

       // If I   .subscribe( x => console.log(x) )  to  this.allValidCards$  here it works in here...

        // then I initialize the streams with data:
        this.normalCards = NormalCards.find({ checked: true }).fetch();
        this.NormalCards$.next(this.normalCards);

        this.freeCards = FreeCards.find({ checked: true }).fetch();
        this.FreeCards$.next(this.freeCards);

        this.premiumCards = PremiumCards.find({ checked: true }).fetch();
        this.PremiumCards$.next(this.premiumCards);

    }, true);
}

If I guess correct, your allValidCards$ is Observable.
So when you subscribe in your component, the value emit before you subscribe, which means the value is already gone.

So you can use BehaviorSubject in this case.
Or check the difference between Hot and Cold observables.

Hi @Hongbo_Miao,

I followed your tutorial and I created a new service too. The service of current user works but when I repeated the pattern with other subscription, I get the data into the client but the html template shows this value as an undefined value. I would appreciate your help.

@puedesleerlo Some codes will help locate the issue. :slight_smile: