1313import com .pubnub .api .enums .PNHeartbeatNotificationOptions ;
1414import com .pubnub .api .enums .PNStatusCategory ;
1515import com .pubnub .api .models .consumer .PNStatus ;
16- import com .pubnub .api .models .server .Envelope ;
1716import com .pubnub .api .models .server .SubscribeEnvelope ;
1817import com .pubnub .api .models .server .SubscribeMessage ;
1918import com .pubnub .api .workers .SubscribeMessageWorker ;
2019import lombok .extern .slf4j .Slf4j ;
21- import retrofit2 .Call ;
2220
2321import java .util .List ;
2422import java .util .Map ;
@@ -32,8 +30,8 @@ public class SubscriptionManager {
3230 private static final int HEARTBEAT_INTERVAL_MULTIPLIER = 1000 ;
3331
3432 private PubNub pubnub ;
35- private Call < SubscribeEnvelope > subscribeCall ;
36- private Call < Envelope > heartbeatCall ;
33+ private Subscribe subscribeCall ;
34+ private Heartbeat heartbeatCall ;
3735
3836 private LinkedBlockingQueue <SubscribeMessage > messageQueue ;
3937
@@ -186,61 +184,63 @@ private void startSubscribeLoop() {
186184 subscribeCall = new Subscribe (pubnub )
187185 .channels (combinedChannels ).channelGroups (combinedChannelGroups )
188186 .timetoken (timetoken ).region (region )
189- .filterExpression (pubnub .getConfiguration ().getFilterExpression ())
190- .async (new PNCallback <SubscribeEnvelope >() {
191- @ Override
192- public void onResponse (final SubscribeEnvelope result , final PNStatus status ) {
193- if (status .isError ()) {
194-
195- if (status .getCategory () == PNStatusCategory .PNTimeoutCategory ) {
196- startSubscribeLoop ();
197- } else {
198- disconnect ();
199- listenerManager .announce (status );
200-
201- // stop all announcements and ask the reconnection manager to start polling for connection restoration..
202- reconnectionManager .startPolling ();
203- }
204-
205- return ;
206- }
207-
208- if (!subscriptionStatusAnnounced ) {
209- PNStatus pnStatus = PNStatus .builder ()
210- .error (false )
211- .category (PNStatusCategory .PNConnectedCategory )
212- .statusCode (status .getStatusCode ())
213- .authKey (status .getAuthKey ())
214- .operation (status .getOperation ())
215- .clientRequest (status .getClientRequest ())
216- .origin (status .getOrigin ())
217- .tlsEnabled (status .isTlsEnabled ())
218- .build ();
219-
220- subscriptionStatusAnnounced = true ;
221- listenerManager .announce (pnStatus );
222- }
223-
224- if (result .getMessages ().size () != 0 ) {
225- messageQueue .addAll (result .getMessages ());
226- }
227-
228- timetoken = result .getMetadata ().getTimetoken ();
229- region = result .getMetadata ().getRegion ();
187+ .filterExpression (pubnub .getConfiguration ().getFilterExpression ());
188+
189+ subscribeCall .async (new PNCallback <SubscribeEnvelope >() {
190+ @ Override
191+ public void onResponse (final SubscribeEnvelope result , final PNStatus status ) {
192+ if (status .isError ()) {
193+
194+ if (status .getCategory () == PNStatusCategory .PNTimeoutCategory ) {
230195 startSubscribeLoop ();
196+ } else {
197+ disconnect ();
198+ listenerManager .announce (status );
199+
200+ // stop all announcements and ask the reconnection manager to start polling for connection restoration..
201+ reconnectionManager .startPolling ();
231202 }
232- });
203+
204+ return ;
205+ }
206+
207+ if (!subscriptionStatusAnnounced ) {
208+ PNStatus pnStatus = PNStatus .builder ()
209+ .error (false )
210+ .category (PNStatusCategory .PNConnectedCategory )
211+ .statusCode (status .getStatusCode ())
212+ .authKey (status .getAuthKey ())
213+ .operation (status .getOperation ())
214+ .clientRequest (status .getClientRequest ())
215+ .origin (status .getOrigin ())
216+ .tlsEnabled (status .isTlsEnabled ())
217+ .build ();
218+
219+ subscriptionStatusAnnounced = true ;
220+ listenerManager .announce (pnStatus );
221+ }
222+
223+ if (result .getMessages ().size () != 0 ) {
224+ messageQueue .addAll (result .getMessages ());
225+ }
226+
227+ timetoken = result .getMetadata ().getTimetoken ();
228+ region = result .getMetadata ().getRegion ();
229+ startSubscribeLoop ();
230+ }
231+ });
232+
233233 }
234234
235235 private void stopSubscribeLoop () {
236- if (subscribeCall != null && ! subscribeCall . isExecuted () && ! subscribeCall . isCanceled () ) {
237- subscribeCall .cancel ();
236+ if (subscribeCall != null ) {
237+ subscribeCall .silentCancel ();
238238 }
239239 }
240240
241241 private void performHeartbeatLoop () {
242- if (heartbeatCall != null && ! heartbeatCall . isCanceled () && ! heartbeatCall . isExecuted () ) {
243- heartbeatCall .cancel ();
242+ if (heartbeatCall != null ) {
243+ heartbeatCall .silentCancel ();
244244 }
245245
246246 List <String > presenceChannels = this .subscriptionState .prepareChannelList (false );
@@ -253,26 +253,28 @@ private void performHeartbeatLoop() {
253253 }
254254
255255 heartbeatCall = new Heartbeat (pubnub )
256- .channels (presenceChannels ).channelGroups (presenceChannelGroups ).state (stateStorage )
257- .async (new PNCallback <Boolean >() {
258- @ Override
259- public void onResponse (Boolean result , PNStatus status ) {
260- PNHeartbeatNotificationOptions heartbeatVerbosity = pubnub
261- .getConfiguration ().getHeartbeatNotificationOptions ();
262-
263- if (status .isError ()) {
264- if (heartbeatVerbosity == PNHeartbeatNotificationOptions .ALL
265- || heartbeatVerbosity == PNHeartbeatNotificationOptions .FAILURES ) {
266- listenerManager .announce (status );
267- }
268-
269- } else {
270- if (heartbeatVerbosity == PNHeartbeatNotificationOptions .ALL ) {
271- listenerManager .announce (status );
272- }
273- }
256+ .channels (presenceChannels ).channelGroups (presenceChannelGroups ).state (stateStorage );
257+
258+ heartbeatCall .async (new PNCallback <Boolean >() {
259+ @ Override
260+ public void onResponse (Boolean result , PNStatus status ) {
261+ PNHeartbeatNotificationOptions heartbeatVerbosity = pubnub
262+ .getConfiguration ().getHeartbeatNotificationOptions ();
263+
264+ if (status .isError ()) {
265+ if (heartbeatVerbosity == PNHeartbeatNotificationOptions .ALL
266+ || heartbeatVerbosity == PNHeartbeatNotificationOptions .FAILURES ) {
267+ listenerManager .announce (status );
274268 }
275- });
269+
270+ } else {
271+ if (heartbeatVerbosity == PNHeartbeatNotificationOptions .ALL ) {
272+ listenerManager .announce (status );
273+ }
274+ }
275+ }
276+ });
277+
276278 }
277279
278280}
0 commit comments