Skip to content

Commit 9ec1d39

Browse files
author
Max Presman
committed
4.0.1 / adjusting resubscribes
1 parent 5130c4f commit 9ec1d39

File tree

4 files changed

+104
-81
lines changed

4 files changed

+104
-81
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4.0.0
1+
4.0.1

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
id 'findbugs'
1010
}
1111
group = 'com.pubnub'
12-
version = '4.0.0'
12+
version = '4.0.1'
1313

1414
description = """"""
1515

src/main/java/com/pubnub/api/endpoints/Endpoint.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ public abstract class Endpoint<Input, Output> {
3535
@Getter(AccessLevel.NONE)
3636
private PNCallback<Output> cachedCallback;
3737

38+
@Getter(AccessLevel.NONE)
39+
private Call<Input> call;
40+
41+
/**
42+
* If the endpoint failed to execute and we do not want to alert the user, flip this to true
43+
* This operation is handy if we internally cancelled the endpoint.
44+
*/
45+
@Getter(AccessLevel.NONE)
46+
private boolean silenceFailures;
47+
3848
private static final int SERVER_RESPONSE_SUCCESS = 200;
3949
private static final int SERVER_RESPONSE_FORBIDDEN = 403;
4050
private static final int SERVER_RESPONSE_BAD_REQUEST = 400;
@@ -47,7 +57,7 @@ public Endpoint(final PubNub pubnubInstance) {
4757
public final Output sync() throws PubNubException {
4858
this.validateParams();
4959

50-
Call<Input> call = doWork(createBaseParams());
60+
call = doWork(createBaseParams());
5161
Response<Input> serverResponse;
5262
Output response;
5363

@@ -83,9 +93,8 @@ public final Output sync() throws PubNubException {
8393
return response;
8494
}
8595

86-
public final Call<Input> async(final PNCallback<Output> callback) {
96+
public final void async(final PNCallback<Output> callback) {
8797
cachedCallback = callback;
88-
Call<Input> call;
8998

9099
try {
91100
call = doWork(createBaseParams());
@@ -97,13 +106,13 @@ public final Call<Input> async(final PNCallback<Output> callback) {
97106
.build();
98107

99108
callback.onResponse(null, createStatusResponse(PNStatusCategory.PNBadRequestCategory, null, pubnubException));
100-
return null;
109+
return;
101110
}
102111

103112
call.enqueue(new retrofit2.Callback<Input>() {
104113

105114
@Override
106-
public void onResponse(final Call<Input> call, final Response<Input> response) {
115+
public void onResponse(final Call<Input> performedCall, final Response<Input> response) {
107116
Output callbackResponse;
108117

109118
if (!response.isSuccessful() || response.code() != SERVER_RESPONSE_SUCCESS) {
@@ -149,16 +158,19 @@ public void onResponse(final Call<Input> call, final Response<Input> response) {
149158
return;
150159
}
151160

152-
callback.onResponse(callbackResponse, createStatusResponse(null, response, null));
161+
callback.onResponse(callbackResponse, createStatusResponse(PNStatusCategory.PNAcknowledgmentCategory, response, null));
153162
}
154163

155164
@Override
156-
public void onFailure(final Call<Input> call, final Throwable throwable) {
165+
public void onFailure(final Call<Input> performedCall, final Throwable throwable) {
166+
if (silenceFailures) {
167+
return;
168+
}
169+
157170
PNStatusCategory pnStatusCategory = PNStatusCategory.PNBadRequestCategory;
158171
PubNubException.PubNubExceptionBuilder pubnubException = PubNubException.builder()
159172
.errormsg(throwable.getMessage());
160173

161-
162174
try {
163175
throw throwable;
164176
} catch (UnknownHostException networkException) {
@@ -178,14 +190,23 @@ public void onFailure(final Call<Input> call, final Throwable throwable) {
178190

179191
}
180192
});
181-
182-
return call;
183193
}
184194

185195
public void retry() {
196+
silenceFailures = false;
186197
async(cachedCallback);
187198
};
188199

200+
/**
201+
* cancel the operation but do not alert anybody, useful for restarting the heartbeats and subscribe loops.
202+
*/
203+
public void silentCancel() {
204+
if (call != null && !call.isCanceled()) {
205+
this.silenceFailures = true;
206+
call.cancel();
207+
}
208+
}
209+
189210
private PNStatus createStatusResponse(PNStatusCategory category, Response<Input> response, Exception throwable) {
190211
PNStatus.PNStatusBuilder pnStatus = PNStatus.builder();
191212

src/main/java/com/pubnub/api/managers/SubscriptionManager.java

Lines changed: 71 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,10 @@
1313
import com.pubnub.api.enums.PNHeartbeatNotificationOptions;
1414
import com.pubnub.api.enums.PNStatusCategory;
1515
import com.pubnub.api.models.consumer.PNStatus;
16-
import com.pubnub.api.models.server.Envelope;
1716
import com.pubnub.api.models.server.SubscribeEnvelope;
1817
import com.pubnub.api.models.server.SubscribeMessage;
1918
import com.pubnub.api.workers.SubscribeMessageWorker;
2019
import lombok.extern.slf4j.Slf4j;
21-
import retrofit2.Call;
2220

2321
import java.util.List;
2422
import java.util.Map;
@@ -32,8 +30,8 @@ public class SubscriptionManager {
3230
private static final int HEARTBEAT_INTERVAL_MULTIPLIER = 1000;
3331

3432
private PubNub pubnub;
35-
private Call<SubscribeEnvelope> subscribeCall;
36-
private Call<Envelope> heartbeatCall;
33+
private Subscribe subscribeCall;
34+
private Heartbeat heartbeatCall;
3735

3836
private LinkedBlockingQueue<SubscribeMessage> messageQueue;
3937

@@ -186,61 +184,63 @@ private void startSubscribeLoop() {
186184
subscribeCall = new Subscribe(pubnub)
187185
.channels(combinedChannels).channelGroups(combinedChannelGroups)
188186
.timetoken(timetoken).region(region)
189-
.filterExpression(pubnub.getConfiguration().getFilterExpression())
190-
.async(new PNCallback<SubscribeEnvelope>() {
191-
@Override
192-
public void onResponse(final SubscribeEnvelope result, final PNStatus status) {
193-
if (status.isError()) {
194-
195-
if (status.getCategory() == PNStatusCategory.PNTimeoutCategory) {
196-
startSubscribeLoop();
197-
} else {
198-
disconnect();
199-
listenerManager.announce(status);
200-
201-
// stop all announcements and ask the reconnection manager to start polling for connection restoration..
202-
reconnectionManager.startPolling();
203-
}
204-
205-
return;
206-
}
207-
208-
if (!subscriptionStatusAnnounced) {
209-
PNStatus pnStatus = PNStatus.builder()
210-
.error(false)
211-
.category(PNStatusCategory.PNConnectedCategory)
212-
.statusCode(status.getStatusCode())
213-
.authKey(status.getAuthKey())
214-
.operation(status.getOperation())
215-
.clientRequest(status.getClientRequest())
216-
.origin(status.getOrigin())
217-
.tlsEnabled(status.isTlsEnabled())
218-
.build();
219-
220-
subscriptionStatusAnnounced = true;
221-
listenerManager.announce(pnStatus);
222-
}
223-
224-
if (result.getMessages().size() != 0) {
225-
messageQueue.addAll(result.getMessages());
226-
}
227-
228-
timetoken = result.getMetadata().getTimetoken();
229-
region = result.getMetadata().getRegion();
187+
.filterExpression(pubnub.getConfiguration().getFilterExpression());
188+
189+
subscribeCall.async(new PNCallback<SubscribeEnvelope>() {
190+
@Override
191+
public void onResponse(final SubscribeEnvelope result, final PNStatus status) {
192+
if (status.isError()) {
193+
194+
if (status.getCategory() == PNStatusCategory.PNTimeoutCategory) {
230195
startSubscribeLoop();
196+
} else {
197+
disconnect();
198+
listenerManager.announce(status);
199+
200+
// stop all announcements and ask the reconnection manager to start polling for connection restoration..
201+
reconnectionManager.startPolling();
231202
}
232-
});
203+
204+
return;
205+
}
206+
207+
if (!subscriptionStatusAnnounced) {
208+
PNStatus pnStatus = PNStatus.builder()
209+
.error(false)
210+
.category(PNStatusCategory.PNConnectedCategory)
211+
.statusCode(status.getStatusCode())
212+
.authKey(status.getAuthKey())
213+
.operation(status.getOperation())
214+
.clientRequest(status.getClientRequest())
215+
.origin(status.getOrigin())
216+
.tlsEnabled(status.isTlsEnabled())
217+
.build();
218+
219+
subscriptionStatusAnnounced = true;
220+
listenerManager.announce(pnStatus);
221+
}
222+
223+
if (result.getMessages().size() != 0) {
224+
messageQueue.addAll(result.getMessages());
225+
}
226+
227+
timetoken = result.getMetadata().getTimetoken();
228+
region = result.getMetadata().getRegion();
229+
startSubscribeLoop();
230+
}
231+
});
232+
233233
}
234234

235235
private void stopSubscribeLoop() {
236-
if (subscribeCall != null && !subscribeCall.isExecuted() && !subscribeCall.isCanceled()) {
237-
subscribeCall.cancel();
236+
if (subscribeCall != null) {
237+
subscribeCall.silentCancel();
238238
}
239239
}
240240

241241
private void performHeartbeatLoop() {
242-
if (heartbeatCall != null && !heartbeatCall.isCanceled() && !heartbeatCall.isExecuted()) {
243-
heartbeatCall.cancel();
242+
if (heartbeatCall != null) {
243+
heartbeatCall.silentCancel();
244244
}
245245

246246
List<String> presenceChannels = this.subscriptionState.prepareChannelList(false);
@@ -253,26 +253,28 @@ private void performHeartbeatLoop() {
253253
}
254254

255255
heartbeatCall = new Heartbeat(pubnub)
256-
.channels(presenceChannels).channelGroups(presenceChannelGroups).state(stateStorage)
257-
.async(new PNCallback<Boolean>() {
258-
@Override
259-
public void onResponse(Boolean result, PNStatus status) {
260-
PNHeartbeatNotificationOptions heartbeatVerbosity = pubnub
261-
.getConfiguration().getHeartbeatNotificationOptions();
262-
263-
if (status.isError()) {
264-
if (heartbeatVerbosity == PNHeartbeatNotificationOptions.ALL
265-
|| heartbeatVerbosity == PNHeartbeatNotificationOptions.FAILURES) {
266-
listenerManager.announce(status);
267-
}
268-
269-
} else {
270-
if (heartbeatVerbosity == PNHeartbeatNotificationOptions.ALL) {
271-
listenerManager.announce(status);
272-
}
273-
}
256+
.channels(presenceChannels).channelGroups(presenceChannelGroups).state(stateStorage);
257+
258+
heartbeatCall.async(new PNCallback<Boolean>() {
259+
@Override
260+
public void onResponse(Boolean result, PNStatus status) {
261+
PNHeartbeatNotificationOptions heartbeatVerbosity = pubnub
262+
.getConfiguration().getHeartbeatNotificationOptions();
263+
264+
if (status.isError()) {
265+
if (heartbeatVerbosity == PNHeartbeatNotificationOptions.ALL
266+
|| heartbeatVerbosity == PNHeartbeatNotificationOptions.FAILURES) {
267+
listenerManager.announce(status);
274268
}
275-
});
269+
270+
} else {
271+
if (heartbeatVerbosity == PNHeartbeatNotificationOptions.ALL) {
272+
listenerManager.announce(status);
273+
}
274+
}
275+
}
276+
});
277+
276278
}
277279

278280
}

0 commit comments

Comments
 (0)