🏄 Subscribe data in Service using RxJS 5

It is related with ngZone. Try this:

import { NgZone } from '@angular/core'

constructor(private _ngZone: NgZone) {}

this.call('contacts.add',data,function (error,id) {
    this._ngZone.run(() => {
      self.SetContactId(id);
    });
}

Thanks. That works perfectly.

Just to get this clear in my mind. The ngzone.run() tells angular2 that functions outside the angular environment has run with results that need updating.

Hi, @Hongbo_Miao,

apologies for the lack of clarity. Here’s what I’m trying to accomplish.

I have a service in allValidCards.ts which provides a BehaviorSubject stream allValidCards$ similar to how the "currentUser$ stream is provided by user.service.ts to randam.component.ts.

Within allValidCards.service.ts, the allValidCards$ is composed from three other services from three collections.

The three meteor collections are normalCards, freeCards, and premiumCards and and their corresponding BehaviorSubject streams are:

NormalCards$ = new BehaviorSubject<Mongo.Cursor<NormalCard>>(null);
FreeCards$ = new BehaviorSubject<Mongo.Cursor<FreeCard>>(null);
PremiumCards$ = new BehaviorSubject<Mongo.Cursor<PremiumCard>>(null);

Using rxJS I’m composing the stream allValidCards$ as follows:

this.allValidCards$
         .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
         .scan( (acc, curr) => {
             if (acc != null) {
                 let tmp_stream =  curr.concat(acc);
                 return tmp_stream;
             }
             return curr;
         });

// then I initialize the streams with data:
this.normalCards = NormalCards.find({ checked: true }).fetch();
this.NormalCards$.next(this.normalCards);	

this.freeCards = FreeCards.find({ checked: true }).fetch();
this.FreeCards$.next(this.freeCards);

this.premiumCards = PremiumCards.find({ checked: true }).fetch();
this.PremiumCards$.next(this.premiumCards);

If I .subscribe to this.allValidCards$ within allValidCards.service.ts, the operators are applied as expected. All values from all streams are merged into the allValidCards$

However, when “subscribing to the stream” in another component such as randam.components.ts, none of the applied operators are preserved. I’m only able to push information onto the stream using this.allValidCards$.next(this.premiumCards);

This part in Service, right? Where do you put this part?

this.allValidCards$
     .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
     .scan( (acc, curr) => {
         if (acc != null) {
             let tmp_stream =  curr.concat(acc);
             return tmp_stream;
         }
         return curr;
     });

I updated a little the tutorial, you can use RxJS Operators if you are familiar

.filter(x => !!x) instead of if (!currentUser) return;, which is cleaner,

1 Like

Yes, it’s in service. I placed it within a subscribeCards() {… } function similar to subscribeCurrentUser() { … }

subscribeCards() {

    this.subscription = this.subscribe('cards.all', () => {

        this.allValidCards$
            .merge(this.NormalCards$, this.FreeCards$, this.PremiumCards$)
            .scan( (acc, curr) => {
                if (acc != null) {
                    let tmp_stream =  curr.concat(acc);
                    return tmp_stream;
                }
                return curr;
            });

       // If I   .subscribe( x => console.log(x) )  to  this.allValidCards$  here it works in here...

        // then I initialize the streams with data:
        this.normalCards = NormalCards.find({ checked: true }).fetch();
        this.NormalCards$.next(this.normalCards);

        this.freeCards = FreeCards.find({ checked: true }).fetch();
        this.FreeCards$.next(this.freeCards);

        this.premiumCards = PremiumCards.find({ checked: true }).fetch();
        this.PremiumCards$.next(this.premiumCards);

    }, true);
}

If I guess correct, your allValidCards$ is Observable.
So when you subscribe in your component, the value emit before you subscribe, which means the value is already gone.

So you can use BehaviorSubject in this case.
Or check the difference between Hot and Cold observables.

Hi @Hongbo_Miao,

I followed your tutorial and I created a new service too. The service of current user works but when I repeated the pattern with other subscription, I get the data into the client but the html template shows this value as an undefined value. I would appreciate your help.

@puedesleerlo Some codes will help locate the issue. :slight_smile:

Thank you very much. Everything works as expected, but the changeDetection of the child component, that I set as ChangeDetectionStrategy.OnPush

parentComponent.html

    
    
 

parentComponent.ts

    constructor(private service: ServiceClass) {}
    ngOnInit() {
          this.service.seleccion();
        }
   
 

servicio.ts

    export ServiceClass extends MeteorComponent {
people = new BehaviorSubject(null);
    seleccion() {
        this.subscribe('find', () => {
          this.autorun(() => {
              this.people.next(Users.find().fetch());
          });
        });
       } 
 

ChildComponent.ts

    export class ChildComponent implements OnChanges () {
    @Input: people
`   seePeople: Person[]`
    ngOnChanges(changes) {
    if(changes.people) {
    var p = changes.people.currentValue;
    if(p) {
    this.seePeople = changes.people.currentValue;
    } }}}

ChildComponent.html

    
{{t.name}}

Which html template shows this value as an undefined value?

The child component. The problem seemed to be that the array reference doen’t change when I change the state of the observable. It was an asynchronous problem. But I fixed it calling NgZone into constructor and using the function this.zone.run() into the callback function of the subcription. And filtering the null values, as you show in your tutorial.

ngOnInit() {
      this.service.subscribePeople();
      this.subscription = this.service.people$.filter(x => !!x)
      .subscribe(v => {
        this.zone.run(() => {
          this.people = v;
        })
      })
    }

Thanks a lot. :slight_smile:

Are you using BehaviorSubjects? If not, you need change order to this:

this.subscription = this.service.people$
      .filter(x => !!x)
      .subscribe(v => {
        this.zone.run(() => {
          this.people = v;
        })
      })

this.service.subscribePeople();

If you are using BehaviorSubjects, add one line .do(() => console.log(people)), let me know what it shows:

this.subscription = this.service.people$
      .do(() => console.log(people))    // <- what shows here?
      .filter(x => !!x)
      .subscribe(v => {
        this.zone.run(() => {
          this.people = v;
        })
      })

This post has been a while, and I lost control to edit original post.

I did a Google slides, please check this post, which explains a lot of things:
:cherries: Desugar Meteor, Angular 2, RxJS 5, and ngrx

UPDATED: New Observable based Meteor API release! No need to wrap manually.
Check 🎤 New Observable based Meteor API (beta) release!

Hi @Hongbo_Miao, i’m a bit confused as what the suggested method for subscribing to data in services is now? I followed your method at the top of this page which worked really well but i’ve switched over to using the new observables and i’m struggling to get it working smoothly.

Would you be able to provide a similar break down to the one on this page but with the new observables? I would like to know the recommended method for subscribing to data that returns a single object as well as data that returns an array of objects.

Thanks!

Oh, I think right now the recommended way is using meteor-rxjs used in Socially tutorial.

In the tutorial all the data is loaded from within the component, but Angular suggests loading the data from a service. I was hoping for a comprehensive example of how to do it this way

1 Like

This Angular2-Meteor Boilerplate has a demo component that loads data from a service. It’s only a boilerplate example though:

2 Likes

I think this is an easy example of an Angular2 service using the meteor-rxjs API:

import { Injectable } from '@angular/core';
import { MeteorObservable, ObservableCursor } from 'meteor-rxjs';
import { Observable } from 'rxjs/Observable';

import { ExampleCollection } from '../../../../both/collections/example.collection';
import { Example } from '../../../../both/models/example.model';

@Injectable()
export class ExampleService {

  private examples: ObservableCursor<Example>;

  constructor() { 
    MeteorObservable.subscribe('example').subscribe(()=> {
       this.examples = ExampleCollection.find({});
       this.examples.subscribe(c => {
         console.log("example collection found:", c, typeof(c));
       })
    });
  }

  getExamplesStatic() {
    return this.examples;
  }

  getExamplesDynamic(query:Object) {
    return Observable.create(observer => {
      MeteorObservable.subscribe('example', query).zone().subscribe(()=> {
        observer.next(ExampleCollection.find(query).fetch());
        observer.complete();
      });
    }); 
  }
}

Here getExamplesStatic() relies on the subscription in the constructor. You can add an autorun() call to have examples updated automatically.

The other function getExamplesDynamic does its own subscription since it can be different every time it is called. We return a custom observable here with the fetched values from the ObservableCursor as soon as the subscription is done.
Maybe this can be made simpler, but it works.

1 Like