diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java index 14baa2c..3cfdfea 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java @@ -111,10 +111,20 @@ public boolean set(V response) { "No handler is set when response is set. Resource leak may occur.", new IllegalStateException() // log the stacktrace ); + if (response != null) { + response.touch("No Listener ;( Tfuture.set(...)"); + } } this.response = response; - return super.set(response); + if (response != null) { + response.touch("Tfuture.set(...) about to set"); + } + boolean result = super.set(response); + if (response != null) { + response.touch("Tfuture.set(...) was set"); + } + return result; } @Override @@ -123,7 +133,11 @@ public boolean setException(Throwable throwable) { } @Override - public void addListener(final Runnable listener, Executor exec) { + public void addListener(final Runnable listener, final Executor exec) { + final String listenerAsString = "listener:" + listener.getClass() + "/" + listener.toString(); + if (response != null) { + response.touch("TFuture.addListener(..) "+ listenerAsString); + } listenerCount.incrementAndGet(); // this is the current span of whomever is adding the listener - preserve it for the invocation of the latter final Span span = tracingContext != null && tracingContext.hasSpan() ? tracingContext.currentSpan() : null; @@ -133,6 +147,9 @@ public void run() { try { try { pushSpan(span); + if (response != null) { + response.touch("TFuture.addListener(..) - before listener run(" + listenerAsString + ")"); + } listener.run(); } finally { popSpan(span, listener); @@ -142,6 +159,9 @@ public void run() { // a) whether listener actually ran or not; // b) whether tracing was pushed/popped or not; int remainingListeners = listenerCount.decrementAndGet(); + if (response != null) { + response.touch("TFuture.addListener(..). About to release, remainingListeners:" + remainingListeners); + } if (remainingListeners <= 0) { if (response != null) { response.release(); @@ -153,6 +173,9 @@ public void run() { } private void popSpan(Span span, Runnable listener) { + if (response != null) { + response.touch("TFuture.popSpan(..) "); + } if (span != null) { try { // this _might_ fail in case the listener managed to corrupt the tracing context Span poppedSpan = tracingContext.popSpan(); @@ -172,6 +195,9 @@ private void pushSpan(Span span) { if (span != null) { tracingContext.pushSpan(span); } + if (response != null) { + response.touch("TFuture.pushSpan(..) "); + } } @Override @@ -184,6 +210,11 @@ public V get() throws InterruptedException, ExecutionException { // multiple times if INTERRUPTED, see Uninterruptibles#getUninterruptibly(java.util.concurrent.Future) listenerCount.incrementAndGet(); + if (response != null) { + response.touch( + "TFuture.get(..) - listenerCount incremented, new value :" + listenerCount.get()); + } + return result; } diff --git a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java index 404c6c6..8304f82 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java @@ -126,6 +126,10 @@ public void release() { } request.release(); + + if (lastError != null) { + lastError.touch("OutRequest.release()"); + } } public boolean isUsedPeer(SocketAddress address) { @@ -141,6 +145,9 @@ public void setUsedPeer(SocketAddress address) { } public void setLastError(@NotNull ErrorResponse lastError) { + if (this.lastError != null) { + this.lastError.touch("Evicted previous last error OutRequest.setLastError(...)"); + } this.lastError = lastError; } @@ -155,6 +162,12 @@ public void setLastError(ErrorType errorType, String message) { public void setFuture(@NotNull Response response) { release(); setResponseFuture(request.getArgScheme(), response); +// if (lastError != null) { +// lastError.release(); +// } + if (lastError != null) { + lastError.touch("OutRequest.setFuture(...) left last error non-released"); + } } public void setFuture() { @@ -162,11 +175,17 @@ public void setFuture() { } public void handleResponse(@NotNull ResponseMessage response) { + if (response != null) { + response.touch("OutRequest.handleResponse(...)"); + } if (!response.isError()) { setFuture((Response) response); return; } + if (response != null) { + response.touch("OutRequest.handleResponse-setLastErrors(...)"); + } setLastError((ErrorResponse)response); // reset the read index of args for retries @@ -176,6 +195,10 @@ public void handleResponse(@NotNull ResponseMessage response) { @SuppressWarnings({"rawtypes", "unchecked"}) protected void setResponseFuture(ArgScheme argScheme, Response response) { + if (response != null) { + String requestString = request == null ? "" : this.request.toString(); + response.touch("OutRequest.setResponseFuture(" + argScheme + ", " + requestString + ")"); + } switch (argScheme) { case RAW: ((TFuture)future).set((RawResponse) response);