Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

How to use multiple subscriptions and unsubscribes? #2

Open
pantonis opened this issue Apr 12, 2017 · 25 comments
Open

How to use multiple subscriptions and unsubscribes? #2

pantonis opened this issue Apr 12, 2017 · 25 comments

Comments

@pantonis
Copy link

Hi,

Using one instance of a StompService can I subscribe to multiple queues?

@pantonis pantonis changed the title Multiple subscription question Multiple subscriptions question Apr 12, 2017
@kum-deepak
Copy link
Member

kum-deepak commented Apr 12, 2017

Yes, you can subscribe any number of queues. Just make multiple calls to subscribe.

Please check the samples as well. If you are new to dependency injection, please add the StompService to provide list in app.module.ts (as suggested in samples). That way same instance of the STOMP connection will be used across your application.

@pantonis
Copy link
Author

And each subscription must be assigned to a different observable?

@kum-deepak
Copy link
Member

When you make a call to subscribe, a new Observable is returned, which will provide messages for that queue.

@pantonis
Copy link
Author

Forgive me I am not web dev expert.

 this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
 this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

or

 this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
 this.messages = this._stompService.subscribe('/topic/ng-demo-sub2');

@kum-deepak
Copy link
Member

The first one:

this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

@pantonis
Copy link
Author

and to unsubscribe

this.subscription.unsubscribe();

is this enough for both subscriptions?

@kum-deepak
Copy link
Member

kum-deepak commented Apr 12, 2017

No, the code will look something like following (I haven't executed the code, just typed here):

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages = this._stompService
  .subscribe('/topic/ng-demo-sub')
  .subscribe((msg: Message) => {
    // use it
  });

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2 = this._stompService
  .subscribe('/topic/ng-demo-sub2')
  .subscribe((msg: Message) => {
    // use it
  });

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

The above has been updated.

@kum-deepak
Copy link
Member

In summary both the subscriptions are independent of each other and do not affect each other.

@pantonis
Copy link
Author

ok thanks a lot

@marcelormourao
Copy link

marcelormourao commented Apr 11, 2018

this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages.subscribe((msg: Message) => {
   // use it
});

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2.subscribe((msg: Message) => {
   // use it
});

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

I am using StompRService and when I call this.messages2.unsubscribe() the queueName in the unsubscribe function is '/topic/ng-demo-sub'.

@kum-deepak kum-deepak reopened this Apr 12, 2018
@kum-deepak
Copy link
Member

@marcelormourao, please create a demo project demonstrating the issue. You can fork one of the demo projects (links in the README). Thanks!

Meanwhile I am also taking a look.

@fernando-nog
Copy link

I has a similiar problem =(

@kum-deepak
Copy link
Member

kum-deepak commented Apr 13, 2018

I have tested the following sequence (I have added these and few more cases in src/specs/app/services/stomp-queue.operations.spec.ts):

      queSubscription1 = stompService.subscribe(queueName1)
        .map((message) => message.body)
        .subscribe(spyHandler1);
      queSubscription2 = stompService.subscribe(queueName2)
        .map((message) => message.body)
        .subscribe(spyHandler2);

      stompService.publish(queueName1, 'Message 01-01');
      stompService.publish(queueName2, 'Message 02-01');
     
      queSubscription1.unsubscribe();

      stompService.publish(queueName1, 'Message 01-02');
      stompService.publish(queueName2, 'Message 02-02');

Please see the log output corresponding to the above call. Lines starting with **** are my annotations.

LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Opening Web Socket...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Connecting...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Web Socket Opened...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> CONNECT
login:guest
passcode:guest
accept-version:1.2,1.1,1.0
heart-beat:0,0

'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< CONNECTED
server:RabbitMQ/3.7.3
session:session-E3Pxe34zOx2lwcASq8Zgxg
heart-beat:0,0
version:1.2


'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'connected to server RabbitMQ/3.7.3'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Connected'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will try sending queued messages '
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Request to subscribe /topic/ng-demo-sub01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will subscribe to /topic/ng-demo-sub01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SUBSCRIBE
ack:auto
id:sub-0
destination:/topic/ng-demo-sub01

'
***************************************************************************************************
**** id:sub-0 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Request to subscribe /topic/ng-demo-sub02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will subscribe to /topic/ng-demo-sub02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SUBSCRIBE
ack:auto
id:sub-1
destination:/topic/ng-demo-sub02

'
***************************************************************************************************
**** id:sub-1 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub01
content-length:13

Message 01-01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub02
content-length:13

Message 02-01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-0
destination:/topic/ng-demo-sub01
message-id:T_sub-0@@session-E3Pxe34zOx2lwcASq8Zgxg@@1
redelivered:false
content-length:13

Message 01-01
'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-1
destination:/topic/ng-demo-sub02
message-id:T_sub-1@@session-E3Pxe34zOx2lwcASq8Zgxg@@2
redelivered:false
content-length:13

Message 02-01
'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Stop watching connection state (for /topic/ng-demo-sub01)'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will unsubscribe from /topic/ng-demo-sub01 at Stomp'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> UNSUBSCRIBE
id:sub-0

'
***************************************************************************************************
**** Unsubscribed from /topic/ng-demo-sub01
**** id:sub-0 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub01
content-length:13

Message 01-02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub02
content-length:13

Message 02-02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-1
destination:/topic/ng-demo-sub02
message-id:T_sub-1@@session-E3Pxe34zOx2lwcASq8Zgxg@@3
redelivered:false
content-length:13

Message 02-02
'
***************************************************************************************************
**** Message is only received for /topic/ng-demo-sub02
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> DISCONNECT
receipt:close-2

'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< RECEIPT
receipt-id:close-2


'

@kum-deepak
Copy link
Member

Please note that I have updated my schematic code at #2 (comment)

@kum-deepak kum-deepak changed the title Multiple subscriptions question How to use multiple subscriptions and unsubscribes? May 5, 2018
@evgenyfedorenko
Copy link

@kum-deepak I am having the similar issue. I reported it here #73. Basically it does not work if I specify a subscription callback via an inline function then I am having that issue. I did not try anonymous function like you had in your example - that might work but I still would like to be able to set inline function as a callback to the topic Observable subscribe. Thanks

@tomekstankowski
Copy link

If you pass stomp headers to subscribe method like stompService.subscribe(destination, headers), make sure you create new headers with each call to subscribe. Headers are modified inside and that's causing problems with subscriptions.

@kum-deepak
Copy link
Member

@s1vert, seems bit peculiar behavior. Please fork any of the samples and modify it to show the issue. I will like to have a look at the issue.

@kum-deepak
Copy link
Member

@s1vert I think I undertsand the issue now. It will need to be fixed in underlying library as well as in this one. It will be done in next major release.

@sahyun1
Copy link

sahyun1 commented Nov 1, 2018

would this be part of 7.0.0?

@kum-deepak
Copy link
Member

@sahyun1 this thread has two different issues mentioned. The original question - multiple subscriptions - that has always worked.

There was one more issue that some functions were altering the header argument. It has been fixed in underlying library stomp-js/stompjs#11. It will be part of v7.

If you mean any other issue, please create a new one 😄

@ramakanthreddyk
Copy link

No, the code will look something like following (I haven't executed the code, just typed here):

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages = this._stompService
  .subscribe('/topic/ng-demo-sub')
  .subscribe((msg: Message) => {
    // use it
  });

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2 = this._stompService
  .subscribe('/topic/ng-demo-sub2')
  .subscribe((msg: Message) => {
    // use it
  });

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

The above has been updated.

Cant we do it dynamically ?

@kum-deepak
Copy link
Member

Could not understand your query. Please post some code sample.

@ramakanthreddyk
Copy link

Could not understand your query. Please post some code sample.

get(demo) {
this._stompService
.subscribe('/topic/${demo}')
.subscribe((msg: Message) => {
// use this message
});

}

this.get(ng-demo-sub);
this.get(ng-demo-sub2)

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

cant we unsubscribe from one observable

@kum-deepak
Copy link
Member

Probably you want the following (I haven't executed the code, just typed here):

  get(topic) {
    return this._stompService.subscribe(`/topic/${topic}`).subscribe((message) => {
      // Do something
    });
  }

    // Subscribe
    this.messages = this.get('demo1');
    this.messages2 = this.get('demo2');

    // Unsubscribe from /topic/ng-demo-sub
    this.messages.unsubscribe();
    this.messages = null;

    // Unsubscribe from /topic/ng-demo-sub2
    this.messages2.unsubscribe();
    this.messages2 = null;

@dJani97
Copy link

dJani97 commented Jul 29, 2022

And each subscription must be assigned to a different observable?

If the number of new variables gets out of hand, you may use an array to collect all the subscriptions and use a loop to unsubscribe in the end. This also guarantees that you don't forget about any subscriptions.

This looks like a good place to use a pattern like subsink. You just throw a bunch of subscriptions at it, then call unsubscribe once. The code behind this library is dead simple, if it doesn't suit you, it's pretty easy to make your own implementation.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

9 participants