From f52f1de8add7bba7a9908ae2ed88cb852b237d82 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Thu, 14 Mar 2019 02:04:25 -0400 Subject: [PATCH] Re-factor and document Resolver --- .../moparisthebest/dns/listen/TcpAsync.java | 2 +- .../moparisthebest/dns/listen/UdpSync.java | 2 +- .../dns/resolve/CacheResolver.java | 8 +-- .../DelegatingQueueProcessingResolver.java | 5 +- .../moparisthebest/dns/resolve/Resolver.java | 51 ++++++++++++------- .../dns/listen/XmppListener.java | 2 +- 6 files changed, 44 insertions(+), 26 deletions(-) diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/TcpAsync.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/TcpAsync.java index 3f56ced..0721c92 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/TcpAsync.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/TcpAsync.java @@ -43,7 +43,7 @@ public class TcpAsync implements Listener { bufChan.setRequest(new Packet(bufChan.buf)); //debugPacket(bufChan.getRequest().getBuf()); - resolver.resolveAsync(bufChan).whenCompleteAsync((bc, t) -> { + resolver.resolveAsync(bufChan, executor).whenCompleteAsync((bc, t) -> { //System.out.println("got completed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); if(t != null) { t.printStackTrace(); diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/UdpSync.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/UdpSync.java index 87c5f90..83944f0 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/UdpSync.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/listen/UdpSync.java @@ -42,7 +42,7 @@ public class UdpSync implements Listener { //System.out.println(requestResponse); //debugPacket(requestResponse.getRequest().getBuf()); - resolver.resolveAsync(requestResponse).whenCompleteAsync((urr, t) -> { + resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> { if(t != null) { t.printStackTrace(); return; diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java index 8158106..12b7f61 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java @@ -105,7 +105,7 @@ public class CacheResolver implements Resolver, AutoCloseable { } @Override - public CompletableFuture resolveAsync(final E requestResponse) { + public CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { final String key = calcRequestPacketKey(requestResponse.getRequest()); //System.out.println("requestPacketKey: " + key); final CachedPacket response = cache.get(key); @@ -115,7 +115,7 @@ public class CacheResolver implements Resolver, AutoCloseable { if (response.isExpired(currentTime)) { //System.out.println("cachedPacket isExpired!"); requestResponse.setRequestPacketKey(key); - final CompletableFuture request = requestAndCache(requestResponse); + final CompletableFuture request = requestAndCache(requestResponse, executor); final CompletableFuture stale = supplyAsyncOnTimeOut(scheduledExecutorService, staleResponseTimeout, TimeUnit.MILLISECONDS, () -> { requestResponse.setResponse(response.getStaleResponse().setId(requestResponse.getRequest().getId())); return requestResponse; @@ -129,7 +129,7 @@ public class CacheResolver implements Resolver, AutoCloseable { } //System.out.println("no cachedPacket, querying upstream!"); requestResponse.setRequestPacketKey(key); - return requestAndCache(requestResponse); + return requestAndCache(requestResponse, executor); /* // todo: should not have to do this, some upstreams seem to eat stuff though, figure that out, I think readTimeout fixed this final CompletableFuture request = requestAndCache(requestResponse); @@ -142,7 +142,7 @@ public class CacheResolver implements Resolver, AutoCloseable { //boolean first = true; - private CompletableFuture requestAndCache(final E requestResponse) { + private CompletableFuture requestAndCache(final E requestResponse, final Executor executor) { CompletableFuture request = new CompletableFuture<>(); requestResponse.setCompletableFuture(request); //if(first) { diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java index fd03ec1..447ae69 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java @@ -3,6 +3,7 @@ package com.moparisthebest.dns.resolve; import com.moparisthebest.dns.dto.Packet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingResolver { @@ -14,8 +15,8 @@ public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingRe } @Override - public CompletableFuture resolveAsync(final E requestResponse) { - return delegate.resolveAsync(requestResponse); + public CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { + return delegate.resolveAsync(requestResponse, executor); } @Override diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/Resolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/Resolver.java index 531bca1..fd17a9d 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/Resolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/Resolver.java @@ -5,27 +5,44 @@ import com.moparisthebest.dns.net.ParsedUrl; import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +/** + * Implementations need to implement at least 1 of the resolve functions, either resolveAsync or resolve, the default + * implementations simply call the other one ending in a stack overflow. + * + * Ideally, implementations provide optimized versions of both the in-line and async call. + */ public interface Resolver { - default CompletableFuture resolveAsync(E requestResponse) { - try { - requestResponse.setResponse(resolve(requestResponse.getRequest())); - return CompletableFuture.completedFuture(requestResponse); - } catch (Exception e) { - final CompletableFuture ret = new CompletableFuture<>(); - ret.completeExceptionally(e); - return ret; - } - /* - return CompletableFuture.supplyAsync(() -> { - requestResponse.setResponse(resolve(requestResponse.getRequest())); - return requestResponse; - }, executor); - */ + + /** + * This must return immediately and resolve the DNS query in the background, using the given executor + * @param requestResponse + * @param executor + * @param + * @return + */ + default CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { + final CompletableFuture ret = new CompletableFuture<>(); + executor.execute(() -> { + try { + requestResponse.setResponse(resolve(requestResponse.getRequest())); + ret.complete(requestResponse); + } catch (Throwable e) { + ret.completeExceptionally(e); + } + }); + return ret; } - default Packet resolve(Packet request) throws Exception { - return resolveAsync(new BaseRequestResponse(request)).get().getResponse(); + /** + * This must block on resolving the given query + * @param request + * @return + * @throws Exception + */ + default Packet resolve(final Packet request) throws Exception { + return resolveAsync(new BaseRequestResponse(request), Runnable::run).get().getResponse(); } ServiceLoader services = ServiceLoader.load(Services.class); diff --git a/xmpp-dox/src/main/java/com/moparisthebest/dns/listen/XmppListener.java b/xmpp-dox/src/main/java/com/moparisthebest/dns/listen/XmppListener.java index ada9bf7..76df72e 100644 --- a/xmpp-dox/src/main/java/com/moparisthebest/dns/listen/XmppListener.java +++ b/xmpp-dox/src/main/java/com/moparisthebest/dns/listen/XmppListener.java @@ -51,7 +51,7 @@ public class XmppListener implements Listener { //System.out.println("good request: " + req); final XmppRequestResponse requestResponse = new XmppRequestResponse(req.getFrom(), new Packet(request)); - resolver.resolveAsync(requestResponse).whenCompleteAsync((urr, t) -> { + resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> { if (t != null) { t.printStackTrace(); return;