diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java index 353112a..6d8db1a 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/DnsProxy.java @@ -2,6 +2,7 @@ package com.moparisthebest.dns; import com.moparisthebest.dns.listen.Listener; import com.moparisthebest.dns.net.ParsedUrl; +import com.moparisthebest.dns.resolve.BlockingQueueResolver; import com.moparisthebest.dns.resolve.CacheResolver; import com.moparisthebest.dns.resolve.QueueProcessingResolver; import com.moparisthebest.dns.resolve.Resolver; @@ -64,9 +65,11 @@ public class DnsProxy { final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40); final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool(); - final CacheResolver resolver = new CacheResolver(minTtl, staleResponseTtl, staleResponseTimeout, packetQueueLength, - executor, scheduledExecutorService, cacheFile, cacheDelayMinutes) - .startQueueProcessingResolvers(queueProcessingResolvers); + final CacheResolver resolver = new CacheResolver( + new BlockingQueueResolver(packetQueueLength).startQueueProcessingResolvers(executor, queueProcessingResolvers), + minTtl, staleResponseTtl, staleResponseTimeout, + scheduledExecutorService, cacheFile, cacheDelayMinutes) + ; final List listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+")) .map(url -> Listener.ofAndStart(ParsedUrl.of(url), resolver, executor)).collect(Collectors.toList()); diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java new file mode 100644 index 0000000..a067ef5 --- /dev/null +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/BlockingQueueResolver.java @@ -0,0 +1,30 @@ +package com.moparisthebest.dns.resolve; + +import java.util.concurrent.*; + +public class BlockingQueueResolver implements Resolver { + + private final BlockingQueue queue; + + public BlockingQueueResolver(final BlockingQueue queue) { + this.queue = queue; + } + + public BlockingQueueResolver(final int packetQueueLength) { + this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength)); + } + + public Resolver startQueueProcessingResolvers(final ExecutorService executor, final Iterable queueProcessingResolvers) { + for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers) + queueProcessingResolver.start(executor, this.queue); + return this; + } + + @Override + public CompletableFuture resolveAsync(final E requestResponse, final Executor executor) { + final CompletableFuture request = new CompletableFuture<>(); + requestResponse.setCompletableFuture(request); + queue.add(requestResponse); + return request; + } +} diff --git a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java index 294bd2b..698f2cd 100644 --- a/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java +++ b/jDnsProxy/src/main/java/com/moparisthebest/dns/resolve/CacheResolver.java @@ -17,19 +17,18 @@ public class CacheResolver implements Resolver, AutoCloseable { private final int minTtl, staleResponseTtl; private final long staleResponseTimeout; - private final BlockingQueue queue; - private final ExecutorService executor; + private final Resolver delegate; + private final ScheduledExecutorService scheduledExecutorService; private final ConcurrentMap cache = new ConcurrentHashMap<>(); - public CacheResolver(final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final int packetQueueLength, final ExecutorService executor, final ScheduledExecutorService scheduledExecutorService, + public CacheResolver(final Resolver delegate, final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final ScheduledExecutorService scheduledExecutorService, final String cacheFile, final long cacheDelayMinutes) throws IOException { + this.delegate = delegate; this.minTtl = minTtl; this.staleResponseTtl = staleResponseTtl; this.staleResponseTimeout = staleResponseTimeout; - this.queue = packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength); - this.executor = executor; this.scheduledExecutorService = scheduledExecutorService; if(cacheFile != null && !cacheFile.isEmpty()) { final File cacheFileFile = new File(cacheFile); @@ -44,12 +43,6 @@ public class CacheResolver implements Resolver, AutoCloseable { } } - public CacheResolver startQueueProcessingResolvers(final Iterable queueProcessingResolvers) { - for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers) - queueProcessingResolver.start(this.executor, this.queue); - return this; - } - private class CachedPacket { final Packet response; final long receivedSeconds, expiredSeconds; @@ -135,15 +128,8 @@ public class CacheResolver implements Resolver, AutoCloseable { */ } - //boolean first = true; - private CompletableFuture requestAndCache(final E requestResponse, final Executor executor) { - CompletableFuture request = new CompletableFuture<>(); - requestResponse.setCompletableFuture(request); - //if(first) { - //first = false; - queue.add(requestResponse); - //} + CompletableFuture request = delegate.resolveAsync(requestResponse, executor); if(minTtl > 0) { request = request.thenApply(s -> { s.getResponse().modTtls((ttl) -> Math.max(ttl, minTtl));