From 9c9b902de2d7d85ecd50ad09ddb1711c52f56698 Mon Sep 17 00:00:00 2001 From: Denys Kurylenko Date: Fri, 6 Mar 2020 23:25:21 -0800 Subject: [PATCH 1/8] Safe response release --- tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java index 14baa2c..f3e5c5e 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java @@ -147,7 +147,7 @@ public void run() { response.release(); } } - } + fz } } }, exec); } From ceb870191b5ce8dfc8caddf4e47d5e357f04eb51 Mon Sep 17 00:00:00 2001 From: Denyska Date: Sat, 7 Mar 2020 00:11:30 -0800 Subject: [PATCH 2/8] Make release null-safe --- tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java index f3e5c5e..14baa2c 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java @@ -147,7 +147,7 @@ public void run() { response.release(); } } - fz } + } } }, exec); } From b47021150aa0855eed2b3c5c96bd0a8639784b17 Mon Sep 17 00:00:00 2001 From: Denyska Kurylenko Date: Sat, 7 Mar 2020 11:22:13 -0800 Subject: [PATCH 3/8] Don't count Response listener if result is not sucessfully returned --- .../src/test/java/com/uber/tchannel/api/TFutureTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java index 58b73f7..e6c9b2f 100644 --- a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java +++ b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java @@ -1,6 +1,9 @@ package com.uber.tchannel.api; +<<<<<<< HEAD import com.google.common.util.concurrent.MoreExecutors; +======= +>>>>>>> Don't count Response listener if result is not sucessfully returned import com.uber.tchannel.headers.ArgScheme; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; @@ -15,8 +18,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +<<<<<<< HEAD import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +======= +>>>>>>> Don't count Response listener if result is not sucessfully returned public class TFutureTest { @@ -71,6 +77,7 @@ public void run() { assertNull(response.getArg3()); } +<<<<<<< HEAD @Test public void testTfutureResponseNotAutoReleasedIfBlockingGetCalledFirst() throws Exception { ThriftResponse response = prepareResponse(); @@ -148,6 +155,8 @@ public void run() { assertNull(response.getArg3()); } +======= +>>>>>>> Don't count Response listener if result is not sucessfully returned @NotNull private ThriftResponse prepareResponse() { ThriftRequest request = new ThriftRequest.Builder("keyvalue-service", "KeyValue::setValue") From e5d53557818a9f334b8b87f828e3cec202fa632b Mon Sep 17 00:00:00 2001 From: Denyska Kurylenko Date: Sat, 7 Mar 2020 11:38:41 -0800 Subject: [PATCH 4/8] more tests --- .../src/test/java/com/uber/tchannel/api/TFutureTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java index e6c9b2f..58b73f7 100644 --- a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java +++ b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java @@ -1,9 +1,6 @@ package com.uber.tchannel.api; -<<<<<<< HEAD import com.google.common.util.concurrent.MoreExecutors; -======= ->>>>>>> Don't count Response listener if result is not sucessfully returned import com.uber.tchannel.headers.ArgScheme; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; @@ -18,11 +15,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -<<<<<<< HEAD import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -======= ->>>>>>> Don't count Response listener if result is not sucessfully returned public class TFutureTest { @@ -77,7 +71,6 @@ public void run() { assertNull(response.getArg3()); } -<<<<<<< HEAD @Test public void testTfutureResponseNotAutoReleasedIfBlockingGetCalledFirst() throws Exception { ThriftResponse response = prepareResponse(); @@ -155,8 +148,6 @@ public void run() { assertNull(response.getArg3()); } -======= ->>>>>>> Don't count Response listener if result is not sucessfully returned @NotNull private ThriftResponse prepareResponse() { ThriftRequest request = new ThriftRequest.Builder("keyvalue-service", "KeyValue::setValue") From a82bba2a2854b4b572372d10dbda8a0f394c8b5b Mon Sep 17 00:00:00 2001 From: Denyska Kurylenko Date: Sat, 7 Mar 2020 12:01:17 -0800 Subject: [PATCH 5/8] more tests cases --- .../src/test/java/com/uber/tchannel/api/TFutureTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java index 58b73f7..68b3e1f 100644 --- a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java +++ b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java @@ -1,6 +1,7 @@ package com.uber.tchannel.api; import com.google.common.util.concurrent.MoreExecutors; +import com.uber.tchannel.api.handlers.TFutureCallback; import com.uber.tchannel.headers.ArgScheme; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; @@ -12,6 +13,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; From aeef543dc68f454c9cdcc91b2ce61124ad88acd4 Mon Sep 17 00:00:00 2001 From: Denys Kurylenko Date: Sat, 7 Mar 2020 20:03:03 -0800 Subject: [PATCH 6/8] Fix mem leak in HyperbahnClient --- .../src/test/java/com/uber/tchannel/api/TFutureTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java index 68b3e1f..58b73f7 100644 --- a/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java +++ b/tchannel-core/src/test/java/com/uber/tchannel/api/TFutureTest.java @@ -1,7 +1,6 @@ package com.uber.tchannel.api; import com.google.common.util.concurrent.MoreExecutors; -import com.uber.tchannel.api.handlers.TFutureCallback; import com.uber.tchannel.headers.ArgScheme; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; @@ -13,7 +12,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; From 224ec80318e3ee03104f9afaea81a999ee49776b Mon Sep 17 00:00:00 2001 From: Denys Kurylenko Date: Fri, 6 Mar 2020 21:00:14 -0800 Subject: [PATCH 7/8] more tracing --- .../java/com/uber/tchannel/api/TFuture.java | 33 ++++++++++++++++++- .../uber/tchannel/handlers/OutRequest.java | 22 +++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java index 14baa2c..680c1a1 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java @@ -111,10 +111,20 @@ public boolean set(V response) { "No handler is set when response is set. Resource leak may occur.", new IllegalStateException() // log the stacktrace ); + if (response != null) { + response.touch("No Listener ;( Tfuture.set(...)"); + } } this.response = response; - return super.set(response); + if (response != null) { + response.touch("Tfuture.set(...) about to set"); + } + boolean result = super.set(response); + if (response != null) { + response.touch("Tfuture.set(...) was set"); + } + return result; } @Override @@ -124,6 +134,9 @@ public boolean setException(Throwable throwable) { @Override public void addListener(final Runnable listener, Executor exec) { + if (response != null) { + response.touch("TFuture.addListener(..)"); + } listenerCount.incrementAndGet(); // this is the current span of whomever is adding the listener - preserve it for the invocation of the latter final Span span = tracingContext != null && tracingContext.hasSpan() ? tracingContext.currentSpan() : null; @@ -133,6 +146,10 @@ public void run() { try { try { pushSpan(span); + if (response != null) { + response.touch( + "TFuture.addListener(..) - before listener run(" + listener.getClass() + "/" + listener.toString() + ")"); + } listener.run(); } finally { popSpan(span, listener); @@ -142,6 +159,9 @@ public void run() { // a) whether listener actually ran or not; // b) whether tracing was pushed/popped or not; int remainingListeners = listenerCount.decrementAndGet(); + if (response != null) { + response.touch("TFuture.addListener(..). About to release, remainingListeners:" + remainingListeners); + } if (remainingListeners <= 0) { if (response != null) { response.release(); @@ -153,6 +173,9 @@ public void run() { } private void popSpan(Span span, Runnable listener) { + if (response != null) { + response.touch("TFuture.popSpan(..) "); + } if (span != null) { try { // this _might_ fail in case the listener managed to corrupt the tracing context Span poppedSpan = tracingContext.popSpan(); @@ -172,6 +195,9 @@ private void pushSpan(Span span) { if (span != null) { tracingContext.pushSpan(span); } + if (response != null) { + response.touch("TFuture.pushSpan(..) "); + } } @Override @@ -184,6 +210,11 @@ public V get() throws InterruptedException, ExecutionException { // multiple times if INTERRUPTED, see Uninterruptibles#getUninterruptibly(java.util.concurrent.Future) listenerCount.incrementAndGet(); + if (response != null) { + response.touch( + "TFuture.get(..) - listenerCount incremented, new value :" + listenerCount.get()); + } + return result; } diff --git a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java index 404c6c6..e28f3cc 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java @@ -126,6 +126,10 @@ public void release() { } request.release(); + + if (lastError != null) { + lastError.touch("OutRequest.release()"); + } } public boolean isUsedPeer(SocketAddress address) { @@ -141,6 +145,9 @@ public void setUsedPeer(SocketAddress address) { } public void setLastError(@NotNull ErrorResponse lastError) { + if (this.lastError != null) { + this.lastError.touch("Evicted previous last error OutRequest.setLastError(...)"); + } this.lastError = lastError; } @@ -155,6 +162,12 @@ public void setLastError(ErrorType errorType, String message) { public void setFuture(@NotNull Response response) { release(); setResponseFuture(request.getArgScheme(), response); +// if (lastError != null) { +// lastError.release(); +// } + if (lastError != null) { + lastError.touch("OutRequest.setFuture(...) left last error non-released"); + } } public void setFuture() { @@ -162,11 +175,17 @@ public void setFuture() { } public void handleResponse(@NotNull ResponseMessage response) { + if (response != null) { + response.touch("OutRequest.handleResponse(...)"); + } if (!response.isError()) { setFuture((Response) response); return; } + if (response != null) { + response.touch("OutRequest.handleResponse-setLastErrors(...)"); + } setLastError((ErrorResponse)response); // reset the read index of args for retries @@ -176,6 +195,9 @@ public void handleResponse(@NotNull ResponseMessage response) { @SuppressWarnings({"rawtypes", "unchecked"}) protected void setResponseFuture(ArgScheme argScheme, Response response) { + if (response != null) { + response.touch("OutRequest.setResponseFuture(" + argScheme + ")"); + } switch (argScheme) { case RAW: ((TFuture)future).set((RawResponse) response); From 52c14f0a07021427e6ad30bfc771c1fd42786609 Mon Sep 17 00:00:00 2001 From: Denys Kurylenko Date: Sat, 7 Mar 2020 19:03:55 -0800 Subject: [PATCH 8/8] more debug --- .../src/main/java/com/uber/tchannel/api/TFuture.java | 8 ++++---- .../main/java/com/uber/tchannel/handlers/OutRequest.java | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java index 680c1a1..3cfdfea 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TFuture.java @@ -133,9 +133,10 @@ public boolean setException(Throwable throwable) { } @Override - public void addListener(final Runnable listener, Executor exec) { + public void addListener(final Runnable listener, final Executor exec) { + final String listenerAsString = "listener:" + listener.getClass() + "/" + listener.toString(); if (response != null) { - response.touch("TFuture.addListener(..)"); + response.touch("TFuture.addListener(..) "+ listenerAsString); } listenerCount.incrementAndGet(); // this is the current span of whomever is adding the listener - preserve it for the invocation of the latter @@ -147,8 +148,7 @@ public void run() { try { pushSpan(span); if (response != null) { - response.touch( - "TFuture.addListener(..) - before listener run(" + listener.getClass() + "/" + listener.toString() + ")"); + response.touch("TFuture.addListener(..) - before listener run(" + listenerAsString + ")"); } listener.run(); } finally { diff --git a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java index e28f3cc..8304f82 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/handlers/OutRequest.java @@ -196,7 +196,8 @@ public void handleResponse(@NotNull ResponseMessage response) { @SuppressWarnings({"rawtypes", "unchecked"}) protected void setResponseFuture(ArgScheme argScheme, Response response) { if (response != null) { - response.touch("OutRequest.setResponseFuture(" + argScheme + ")"); + String requestString = request == null ? "" : this.request.toString(); + response.touch("OutRequest.setResponseFuture(" + argScheme + ", " + requestString + ")"); } switch (argScheme) { case RAW: