diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java index 88c2e48..9cee995 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java @@ -48,25 +48,29 @@ public class DnsProxy { final String cacheFile = config.get("cacheFile"); final long cacheDelayMinutes = Long.parseLong(config.getOrDefault("cacheDelayMinutes", "60")); - final String[] resolvers = config.getOrDefault("resolvers", "https://dns.google.com/experimental?ct#name=dns.google.com").split("\\s+"); - if (!config.containsKey("maxRetries")) - config.put("maxRetries", String.valueOf(resolvers.length * 2)); + final String[] resolverUrls = config.getOrDefault("resolvers", "https://dns.google.com/experimental?ct#name=dns.google.com").split("\\s+"); - //System.out.println("resolvers: " + Arrays.toString(resolvers)); + final int maxRetries = Integer.parseInt(config.getOrDefault("maxRetries", String.valueOf(resolverUrls.length * 2))); - final List queueProcessingResolvers = Arrays.stream(resolvers).map(s -> ParsedUrl.of(s, config)).map(QueueProcessingResolver::of).collect(Collectors.toList()); - //final List queueProcessingResolvers = new ArrayList<>(); - //queueProcessingResolvers.add(new SocketResolver(5, "socket1", SocketFactory.getDefault(), new InetSocketAddress("8.8.4.4", 53))); - //queueProcessingResolvers.add(new HttpResolver(5, "http1", "https://dns.google.com/experimental?ct")); + //System.out.println("resolverUrls: " + Arrays.toString(resolverUrls)); + + final List resolvers = Arrays.stream(resolverUrls).map(s -> ParsedUrl.of(s, config)).map(Resolver::of).collect(Collectors.toList()); + //final List resolvers = new ArrayList<>(); + //resolvers.add(new SocketResolver(5, "socket1", SocketFactory.getDefault(), new InetSocketAddress("8.8.4.4", 53))); + //resolvers.add(new HttpResolver(5, "http1", "https://dns.google.com/experimental?ct")); final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40); final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool(); + /* + final Resolver multiResolver = MultiResolver.of(packetQueueLength, executor, resolvers); + final Resolver resolver = RetryResolver.of(maxRetries, multiResolver); + */ + + final Resolver multiResolver = MultiResolver.of(packetQueueLength, executor, resolvers); + final CacheResolver resolver = new CacheResolver( - MapResolver.minTtl(minTtl, - new BlockingQueueResolver(packetQueueLength) - .startQueueProcessingResolvers(executor, queueProcessingResolvers) - ), + MapResolver.minTtl(minTtl, RetryResolver.of(maxRetries, multiResolver)), staleResponseTtl, staleResponseTimeout, scheduledExecutorService, cacheFile, cacheDelayMinutes) ; @@ -84,9 +88,10 @@ public class DnsProxy { //if(true) return; executor.shutdown(); scheduledExecutorService.shutdown(); - queueProcessingResolvers.forEach(Util::tryClose); listeners.forEach(Util::tryClose); tryClose(resolver); + tryClose(multiResolver); + resolvers.forEach(Util::tryClose); System.out.println("shutdown complete"); }); diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/AbstractQueueProcessingResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/AbstractQueueProcessingResolver.java deleted file mode 100644 index 97a65a3..0000000 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/AbstractQueueProcessingResolver.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.moparisthebest.dns.resolve; - -import com.moparisthebest.dns.dto.Packet; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; - -public abstract class AbstractQueueProcessingResolver implements QueueProcessingResolver { - - protected final int maxRetries; - protected final String name; - - protected ExecutorService executor; - protected BlockingQueue queue; - private boolean running = false; - private Thread thisThread = null; - - public AbstractQueueProcessingResolver(final int maxRetries, final String name) { - this.maxRetries = maxRetries; - this.name = name; - } - - @Override - public void start(final ExecutorService executor, final BlockingQueue queue) { - this.executor = executor; - this.queue = queue; - this.running = true; - executor.execute(this); - } - - @Override - public void run() { - thisThread = Thread.currentThread(); - if (running) - try { - //System.err.println(name + " getting from queue"); - final RequestResponse requestResponse = queue.take(); - //System.err.println(name + " got from queue"); - Packet response = null; - try { - response = resolve(requestResponse.getRequest()); - } catch (Exception e) { - //e.printStackTrace(); - System.err.println("FAILURE: " + name + ": " + e.getMessage()); - } - - if(response == null) { - // failed - if (requestResponse.getAndIncrementFailureCount() < maxRetries) { - //System.err.println(name + " putting in queue"); - queue.put(requestResponse); - //System.err.println(name + " put in queue"); - } else { - //System.err.println(name + " maxRetries reached SRVFAIL"); - @SuppressWarnings("unchecked") final CompletableFuture cf = (CompletableFuture) requestResponse.getCompletableFuture(); - cf.completeExceptionally(new Exception("SRVFAIL")); - } - } else { - requestResponse.setResponse(response); - //System.err.println(name + " got response: " + requestResponse.getResponse()); - @SuppressWarnings("unchecked") final CompletableFuture cf = (CompletableFuture) requestResponse.getCompletableFuture(); - //System.err.println(name + " completed: " + cf.complete(requestResponse)); - cf.complete(requestResponse); - } - } catch (InterruptedException e) { - throw new RuntimeException("socketresolver take", e); - } finally { - if (running) - executor.execute(this); - } - } - - @Override - public void close() { - running = false; - if (thisThread != null) - thisThread.interrupt(); - } -} diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BaseRequestResponse.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BaseRequestResponse.java index 90e6b4f..a12ba09 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BaseRequestResponse.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BaseRequestResponse.java @@ -11,9 +11,7 @@ public class BaseRequestResponse implements RequestResponse { private Packet request, response; - private CompletableFuture completableFuture; private String requestPacketKey; - private int failureCount; public BaseRequestResponse() { } @@ -41,16 +39,6 @@ public class BaseRequestResponse implements RequestResponse { this.response = response; } - @Override - public CompletableFuture getCompletableFuture() { - return completableFuture; - } - - @Override - public void setCompletableFuture(final CompletableFuture completableFuture) { - this.completableFuture = completableFuture; - } - @Override public String getRequestPacketKey() { return requestPacketKey; @@ -61,19 +49,12 @@ public class BaseRequestResponse implements RequestResponse { this.requestPacketKey = requestPacketKey; } - @Override - public final int getAndIncrementFailureCount() { - return ++failureCount; - } - @Override public String toString() { return "BaseRequestResponse{" + "request=" + request + ", response=" + response + - ", completableFuture=" + completableFuture + ", requestPacketKey=" + requestPacketKey + - ", failureCount=" + failureCount + '}'; } } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java index a067ef5..e5a6976 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java @@ -1,30 +1,38 @@ package com.moparisthebest.dns.resolve; +import com.moparisthebest.dns.Util; + +import java.util.Collection; +import java.util.List; import java.util.concurrent.*; +import java.util.stream.Collectors; -public class BlockingQueueResolver implements Resolver { +public class BlockingQueueResolver implements MultiResolver { - private final BlockingQueue queue; + private final BlockingQueue> queue; - public BlockingQueueResolver(final BlockingQueue queue) { + private final List queueProcessingResolvers; + + public BlockingQueueResolver(final BlockingQueue> queue, final ExecutorService executor, final Collection delegates) { this.queue = queue; + if (delegates.isEmpty()) + throw new IllegalArgumentException("must supply at least 1 resolver"); + queueProcessingResolvers = delegates.stream().map(resolver -> new QueueProcessingResolver(resolver, executor, this.queue)).collect(Collectors.toList()); } - public BlockingQueueResolver(final int packetQueueLength) { - this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength)); - } - - public Resolver startQueueProcessingResolvers(final ExecutorService executor, final Iterable queueProcessingResolvers) { - for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers) - queueProcessingResolver.start(executor, this.queue); - return this; + public BlockingQueueResolver(final int packetQueueLength, final ExecutorService executor, final Collection delegates) { + this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength), executor, delegates); } @Override public CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { - final CompletableFuture request = new CompletableFuture<>(); - requestResponse.setCompletableFuture(request); - queue.add(requestResponse); + final RequestResponseCompletableFuture request = new RequestResponseCompletableFuture<>(requestResponse); + queue.add(request); return request; } + + @Override + public void close() { + queueProcessingResolvers.forEach(Util::tryClose); + } } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java index 38f35de..6573139 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java @@ -12,20 +12,18 @@ import java.util.concurrent.*; import static com.moparisthebest.dns.Util.supplyAsyncOnTimeOut; -public class CacheResolver implements Resolver, AutoCloseable { +public class CacheResolver extends WrappingResolver { private final int staleResponseTtl; private final long staleResponseTimeout; - private final Resolver delegate; - private final ScheduledExecutorService scheduledExecutorService; private final ConcurrentMap cache = new ConcurrentHashMap<>(); public CacheResolver(final Resolver delegate, final int staleResponseTtl, final long staleResponseTimeout, final ScheduledExecutorService scheduledExecutorService, final String cacheFile, final long cacheDelayMinutes) throws IOException { - this.delegate = delegate; + super(delegate); this.staleResponseTtl = staleResponseTtl; this.staleResponseTimeout = staleResponseTimeout; this.scheduledExecutorService = scheduledExecutorService; diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MapResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MapResolver.java index 4c34ccb..9cbbdd9 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MapResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MapResolver.java @@ -6,9 +6,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Function; -public class MapResolver implements Resolver { +public class MapResolver extends WrappingResolver { - private final Resolver delegate; private final Function mapper; public static Resolver minTtl(final int minTtl, final Resolver delegate) { @@ -17,7 +16,7 @@ public class MapResolver implements Resolver { } private MapResolver(final Resolver delegate, final Function mapper) { - this.delegate = delegate; + super(delegate); this.mapper = mapper; } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MultiResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MultiResolver.java new file mode 100644 index 0000000..ff73af3 --- /dev/null +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/MultiResolver.java @@ -0,0 +1,13 @@ +package com.moparisthebest.dns.resolve; + +import java.util.Collection; +import java.util.concurrent.ExecutorService; + +public interface MultiResolver extends Resolver { + + public static MultiResolver of(final int packetQueueLength, final ExecutorService executor, final Collection delegates) { + return packetQueueLength < 0 ? + new RandomUpstreamResolver(delegates) : + new BlockingQueueResolver(packetQueueLength, executor, delegates); + } +} diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/QueueProcessingResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/QueueProcessingResolver.java index 459e065..0890b53 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/QueueProcessingResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/QueueProcessingResolver.java @@ -1,27 +1,69 @@ package com.moparisthebest.dns.resolve; -import com.moparisthebest.dns.net.ParsedUrl; +import com.moparisthebest.dns.dto.Packet; -import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -public interface QueueProcessingResolver extends Resolver, Runnable, AutoCloseable { - void start(final ExecutorService executor, final BlockingQueue queue); +public class QueueProcessingResolver extends WrappingResolver implements Runnable { - public static QueueProcessingResolver of(final String resolver, final Map upperLevelProps) { - return of(ParsedUrl.of(resolver, upperLevelProps)); + private final ExecutorService executor; + private final BlockingQueue> queue; + + private boolean running = false; + private Thread thisThread = null; + + public QueueProcessingResolver(final Resolver delegate, final ExecutorService executor, final BlockingQueue> queue) { + super(delegate); + this.executor = executor; + this.queue = queue; + this.running = true; + executor.execute(this); } - public static QueueProcessingResolver of(final String resolver) { - return of(ParsedUrl.of(resolver)); + @Override + public void run() { + thisThread = Thread.currentThread(); + if (running) + try { + //System.err.println(name + " getting from queue"); + @SuppressWarnings("unchecked") + final RequestResponseCompletableFuture cf = (RequestResponseCompletableFuture) queue.take(); + final RequestResponse requestResponse = cf.getRequestResponse(); + //System.err.println(name + " got from queue"); + Packet response = null; + Throwable resolveException = null; + try { + response = resolve(requestResponse.getRequest()); + } catch (Throwable e) { + //e.printStackTrace(); + //System.err.println("FAILURE: " + name + ": " + e.getMessage()); + resolveException = e; + } + //resolveAsync(requestResponse, executor).get(); + + if(response == null) { + // failed + cf.completeExceptionally(resolveException == null ? new Exception("SRVFAIL") : resolveException); + } else { + requestResponse.setResponse(response); + //System.err.println(name + " got response: " + requestResponse.getResponse()); + //System.err.println(name + " completed: " + cf.complete(requestResponse)); + cf.complete(requestResponse); + } + } catch (InterruptedException e) { + throw new RuntimeException("socketresolver take", e); + } finally { + if (running) + executor.execute(this); + } } - public static QueueProcessingResolver of(final ParsedUrl parsedUrl) { - final int maxRetries = Integer.parseInt(parsedUrl.getProps().getOrDefault("maxRetries", "5")); - String name = parsedUrl.getProps().get("name"); - if(name == null) - name = parsedUrl.getUri().toString(); - return new DelegatingQueueProcessingResolver(maxRetries, name, Resolver.of(parsedUrl)); + @Override + public void close() { + running = false; + if (thisThread != null) + thisThread.interrupt(); } } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RandomUpstreamResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RandomUpstreamResolver.java index 5d3c432..94192d3 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RandomUpstreamResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RandomUpstreamResolver.java @@ -2,11 +2,12 @@ package com.moparisthebest.dns.resolve; import com.moparisthebest.dns.dto.Packet; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; -public class RandomUpstreamResolver implements Resolver { +public class RandomUpstreamResolver implements MultiResolver { private final Resolver[] delegates; @@ -14,6 +15,10 @@ public class RandomUpstreamResolver implements Resolver { this.delegates = delegates; } + public RandomUpstreamResolver(final Collection delegates) { + this.delegates = delegates.toArray(new Resolver[0]); + } + public Resolver random() { return delegates[ThreadLocalRandom.current().nextInt(delegates.length)]; } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponse.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponse.java index 3e0a4c0..bc2d7de 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponse.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponse.java @@ -11,14 +11,7 @@ public interface RequestResponse { /** * These should only be used by resolvers, may be null - CompletableFuture getCompletableFuture(); - void setCompletableFuture(CompletableFuture completableFuture); - CompletableFuture getCompletableFuture(); - void setCompletableFuture(CompletableFuture completableFuture); */ - CompletableFuture getCompletableFuture(); - void setCompletableFuture(CompletableFuture completableFuture); String getRequestPacketKey(); void setRequestPacketKey(String key); - int getAndIncrementFailureCount(); } diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponseCompletableFuture.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponseCompletableFuture.java new file mode 100644 index 0000000..d4b9d3e --- /dev/null +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RequestResponseCompletableFuture.java @@ -0,0 +1,16 @@ +package com.moparisthebest.dns.resolve; + +import java.util.concurrent.CompletableFuture; + +public class RequestResponseCompletableFuture extends CompletableFuture { + + private final E requestResponse; + + public RequestResponseCompletableFuture(final E requestResponse) { + this.requestResponse = requestResponse; + } + + public E getRequestResponse() { + return requestResponse; + } +} diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RetryResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RetryResolver.java new file mode 100644 index 0000000..cb59c2e --- /dev/null +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/RetryResolver.java @@ -0,0 +1,53 @@ +package com.moparisthebest.dns.resolve; + +import com.moparisthebest.dns.dto.Packet; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class RetryResolver extends WrappingResolver { + + protected final int maxRetries; + + public static Resolver of(final int maxRetries, final Resolver delegate) { + // anything less than 1 just don't wrap + return maxRetries < 1 ? delegate : new RetryResolver(delegate, maxRetries); + } + + private RetryResolver(final Resolver delegate, final int maxRetries) { + super(delegate); + this.maxRetries = maxRetries; + } + + @Override + public CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { + // todo: better async way to do this? + final CompletableFuture ret = new CompletableFuture<>(); + executor.execute(() -> { + try { + requestResponse.setResponse(resolve(requestResponse.getRequest())); + ret.complete(requestResponse); + } catch (Throwable e) { + ret.completeExceptionally(e); + } + }); + return ret; + } + + @Override + public Packet resolve(final Packet request) throws Exception { + for(int x = 0; x < maxRetries; ++x) { + try { + final Packet response = super.resolve(request); + if(response != null) + return response; + } catch (Exception e) { + e.printStackTrace(); + //System.err.println("FAILURE: " + name + ": " + e.getMessage()); + } + } + throw new Exception("SRVFAIL"); + } +} diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/WrappingResolver.java similarity index 60% rename from jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java rename to jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/WrappingResolver.java index c0a5f5e..2e266ad 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/DelegatingQueueProcessingResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/WrappingResolver.java @@ -2,15 +2,16 @@ package com.moparisthebest.dns.resolve; import com.moparisthebest.dns.dto.Packet; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingResolver { +public abstract class WrappingResolver implements Resolver { - private final Resolver delegate; + protected final Resolver delegate; - public DelegatingQueueProcessingResolver(final int maxRetries, final String name, final Resolver delegate) { - super(maxRetries, name); + public WrappingResolver(final Resolver delegate) { + Objects.requireNonNull(delegate); this.delegate = delegate; } @@ -24,9 +25,5 @@ public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingRe return delegate.resolve(request); } - @Override - public void close() { - super.close(); - delegate.close(); - } + // we don't call delegate.close() on purpose, whatever created it should close it }