-
Notifications
You must be signed in to change notification settings - Fork 259
patch to support mDNS and multicast to servers #148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
504fe0d
fce0cf8
544182b
855b7be
1cf8fb8
cc916e5
e838df6
92d98d5
80aa3cd
31f2621
c1ce27b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import java.io.EOFException; | ||
| import java.io.IOException; | ||
| import java.net.InetSocketAddress; | ||
| import java.net.SocketAddress; | ||
| import java.net.SocketException; | ||
| import java.net.SocketTimeoutException; | ||
| import java.nio.ByteBuffer; | ||
|
|
@@ -12,7 +13,10 @@ | |
| import java.nio.channels.Selector; | ||
| import java.security.SecureRandom; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Queue; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
|
|
@@ -71,8 +75,7 @@ private static void checkTransactionTimeouts() { | |
| for (Iterator<Transaction> it = pendingTransactions.iterator(); it.hasNext(); ) { | ||
| Transaction t = it.next(); | ||
| if (t.endTime - System.nanoTime() < 0) { | ||
| t.silentCloseChannel(); | ||
| t.f.completeExceptionally(new SocketTimeoutException("Query timed out")); | ||
| t.closeTransaction(); | ||
| it.remove(); | ||
| } | ||
| } | ||
|
|
@@ -81,19 +84,16 @@ private static void checkTransactionTimeouts() { | |
| @RequiredArgsConstructor | ||
| private static class Transaction implements KeyProcessor { | ||
| private final byte[] data; | ||
| private final int max; | ||
| final int max; | ||
| private final long endTime; | ||
| private final DatagramChannel channel; | ||
| private final CompletableFuture<byte[]> f; | ||
| private final SocketAddress remoteSocketAddress; | ||
| final CompletableFuture<List<byte[]>> f; | ||
|
|
||
| void send() throws IOException { | ||
| ByteBuffer buffer = ByteBuffer.wrap(data); | ||
| verboseLog( | ||
| "UDP write", | ||
| channel.socket().getLocalSocketAddress(), | ||
| channel.socket().getRemoteSocketAddress(), | ||
| data); | ||
| int n = channel.send(buffer, channel.socket().getRemoteSocketAddress()); | ||
| verboseLog("UDP write", channel.socket().getLocalSocketAddress(), remoteSocketAddress, data); | ||
| int n = channel.send(buffer, remoteSocketAddress); | ||
| if (n <= 0) { | ||
| throw new EOFException(); | ||
| } | ||
|
|
@@ -109,10 +109,12 @@ public void processReadyKey(SelectionKey key) { | |
|
|
||
| DatagramChannel channel = (DatagramChannel) key.channel(); | ||
| ByteBuffer buffer = ByteBuffer.allocate(max); | ||
| SocketAddress source; | ||
| int read; | ||
| try { | ||
| read = channel.read(buffer); | ||
| if (read <= 0) { | ||
| source = channel.receive(buffer); | ||
| read = buffer.position(); | ||
| if (read <= 0 || source == null) { | ||
| throw new EOFException(); | ||
| } | ||
| } catch (IOException e) { | ||
|
|
@@ -125,29 +127,87 @@ public void processReadyKey(SelectionKey key) { | |
| buffer.flip(); | ||
| byte[] data = new byte[read]; | ||
| System.arraycopy(buffer.array(), 0, data, 0, read); | ||
| verboseLog( | ||
| "UDP read", | ||
| channel.socket().getLocalSocketAddress(), | ||
| channel.socket().getRemoteSocketAddress(), | ||
| data); | ||
| verboseLog("UDP read", channel.socket().getLocalSocketAddress(), source, data); | ||
| silentCloseChannel(); | ||
| f.complete(data); | ||
| f.complete(Collections.singletonList(data)); | ||
| pendingTransactions.remove(this); | ||
| } | ||
|
|
||
| private void silentCloseChannel() { | ||
| void silentCloseChannel() { | ||
| try { | ||
| channel.disconnect(); | ||
| channel.close(); | ||
| } catch (IOException e) { | ||
| // ignore, we either already have everything we need or can't do anything | ||
| } | ||
| } | ||
|
|
||
| void closeTransaction() { | ||
| silentCloseChannel(); | ||
| f.completeExceptionally(new SocketTimeoutException("Query timed out")); | ||
| } | ||
| } | ||
|
|
||
| private static class MultiAnswerTransaction extends Transaction { | ||
| MultiAnswerTransaction( | ||
| byte[] query, | ||
| int max, | ||
| long endTime, | ||
| DatagramChannel channel, | ||
| SocketAddress remoteSocketAddress, | ||
| CompletableFuture<List<byte[]>> f) { | ||
| super(query, max, endTime, channel, remoteSocketAddress, f); | ||
| } | ||
|
|
||
| public void processReadyKey(SelectionKey key) { | ||
| if (!key.isReadable()) { | ||
| silentCloseChannel(); | ||
| f.completeExceptionally(new EOFException("channel not readable")); | ||
| pendingTransactions.remove(this); | ||
| return; | ||
| } | ||
|
|
||
| DatagramChannel channel = (DatagramChannel) key.channel(); | ||
| ByteBuffer buffer = ByteBuffer.allocate(max); | ||
| SocketAddress source; | ||
| int read; | ||
| try { | ||
| source = channel.receive(buffer); | ||
| read = buffer.position(); | ||
| if (read <= 0 || source == null) { | ||
| return; // ignore this datagram | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a lot of code duplication. The if (remoteSocketAddress.getAddress().isMulticast()) {
return;
}
throw new EOFException(); |
||
| } | ||
| } catch (IOException e) { | ||
| silentCloseChannel(); | ||
| f.completeExceptionally(e); | ||
| pendingTransactions.remove(this); | ||
| return; | ||
| } | ||
|
|
||
| buffer.flip(); | ||
| byte[] data = new byte[read]; | ||
| System.arraycopy(buffer.array(), 0, data, 0, read); | ||
| verboseLog("UDP read", channel.socket().getLocalSocketAddress(), source, data); | ||
| answers.add(data); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only other line in this method that differs for mDNS. Instead of duplicating everything, move the processing of the data byte-array into a separate method, e.g. // in Transaction:
...
System.arraycopy(buffer.array(), 0, data, 0, read);
verboseLog("UDP read", channel.socket().getLocalSocketAddress(), source, data);
processAnswer(data);
}
processAnswer(byte[] data) {
silentCloseChannel();
f.complete(Collections.singletonList(data));
pendingTransactions.remove(this);
}
// in MultiAnswerTransaction:
processAnswer(byte[] data) {
answers.add(data);
} |
||
| } | ||
|
|
||
| private ArrayList<byte[]> answers = new ArrayList<>(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this to the top of the class definition and make it |
||
|
|
||
| @Override | ||
| void closeTransaction() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. closeTransaction is only triggered when the timeout has expired. Is this really the only condition on which you want to return answers? What about the case where one is only interested in the first answer, as fast as possible? The timeout processing currently only runs every second. Is that still sufficient? |
||
| if (answers.size() > 0) { | ||
| silentCloseChannel(); | ||
| f.complete(answers); | ||
| } else { | ||
| // we failed, no answers | ||
| super.closeTransaction(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static CompletableFuture<byte[]> sendrecv( | ||
| static CompletableFuture<List<byte[]>> sendrecv( | ||
| InetSocketAddress local, InetSocketAddress remote, byte[] data, int max, Duration timeout) { | ||
| CompletableFuture<byte[]> f = new CompletableFuture<>(); | ||
| CompletableFuture<List<byte[]>> f = new CompletableFuture<>(); | ||
| try { | ||
| final Selector selector = selector(); | ||
| DatagramChannel channel = DatagramChannel.open(); | ||
|
|
@@ -169,6 +229,9 @@ static CompletableFuture<byte[]> sendrecv( | |
|
|
||
| addr = new InetSocketAddress(local.getAddress(), port); | ||
| } | ||
| if (addr.getPort() == SimpleResolver.RESERVED_MDNS_PORT) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of disagree with the statement that "port 5353 is valid in normal DNS". Since mDNS is becoming more prevalent, it would be recommended not to use that port accidentally to avoid receiving requests from other hosts when you're expecting replies. And it doesn't hurt to avoid it, since this part of the code no longer differentiates between receiving DNS and mDNS replies.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In regular DNS the socket is bound, so the only replies accepted are from a DNS server. The case shouldn't occur when using defaults though. But |
||
| continue; // can't use the mDNS server port, try again | ||
| } | ||
|
|
||
| channel.bind(addr); | ||
| bound = true; | ||
|
|
@@ -185,9 +248,15 @@ static CompletableFuture<byte[]> sendrecv( | |
| } | ||
| } | ||
|
|
||
| channel.connect(remote); | ||
| long endTime = System.nanoTime() + timeout.toNanos(); | ||
| Transaction t = new Transaction(data, max, endTime, channel, f); | ||
| Transaction t; | ||
| if (!remote.getAddress().isMulticastAddress()) { | ||
| channel.connect(remote); | ||
| t = new Transaction(data, max, endTime, channel, remote, f); | ||
| } else { | ||
| // stop this a little before the timeout so we can report what answers we did get | ||
| t = new MultiAnswerTransaction(data, max, endTime - 1000000000L, channel, remote, f); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make the behavior between DNS and mDNS consistent with this substraction: either substract in both cases, or in neither. And I think this is a rather high threshold, processing the answers once they're read from the network is really fast (0 to 1ms on my machein), so |
||
| } | ||
| pendingTransactions.add(t); | ||
| registrationQueue.add(t); | ||
| selector.wakeup(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> UNICAST_RESPONSE_FLAG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, in next request.