Refactor BlockingQueueResolver out of CacheResolver

This commit is contained in:
Travis Burtrum 2019-04-12 23:48:03 -04:00
parent 8b90a2dab0
commit c06d3311e8
3 changed files with 41 additions and 22 deletions

View File

@ -2,6 +2,7 @@ package com.moparisthebest.dns;
import com.moparisthebest.dns.listen.Listener; import com.moparisthebest.dns.listen.Listener;
import com.moparisthebest.dns.net.ParsedUrl; import com.moparisthebest.dns.net.ParsedUrl;
import com.moparisthebest.dns.resolve.BlockingQueueResolver;
import com.moparisthebest.dns.resolve.CacheResolver; import com.moparisthebest.dns.resolve.CacheResolver;
import com.moparisthebest.dns.resolve.QueueProcessingResolver; import com.moparisthebest.dns.resolve.QueueProcessingResolver;
import com.moparisthebest.dns.resolve.Resolver; import com.moparisthebest.dns.resolve.Resolver;
@ -64,9 +65,11 @@ public class DnsProxy {
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40); final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40);
final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool(); final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool();
final CacheResolver resolver = new CacheResolver(minTtl, staleResponseTtl, staleResponseTimeout, packetQueueLength, final CacheResolver resolver = new CacheResolver(
executor, scheduledExecutorService, cacheFile, cacheDelayMinutes) new BlockingQueueResolver(packetQueueLength).startQueueProcessingResolvers(executor, queueProcessingResolvers),
.startQueueProcessingResolvers(queueProcessingResolvers); minTtl, staleResponseTtl, staleResponseTimeout,
scheduledExecutorService, cacheFile, cacheDelayMinutes)
;
final List<Listener> listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+")) final List<Listener> listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+"))
.map(url -> Listener.ofAndStart(ParsedUrl.of(url), resolver, executor)).collect(Collectors.toList()); .map(url -> Listener.ofAndStart(ParsedUrl.of(url), resolver, executor)).collect(Collectors.toList());

View File

@ -0,0 +1,30 @@
package com.moparisthebest.dns.resolve;
import java.util.concurrent.*;
public class BlockingQueueResolver implements Resolver {
private final BlockingQueue<RequestResponse> queue;
public BlockingQueueResolver(final BlockingQueue<RequestResponse> queue) {
this.queue = queue;
}
public BlockingQueueResolver(final int packetQueueLength) {
this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength));
}
public Resolver startQueueProcessingResolvers(final ExecutorService executor, final Iterable<QueueProcessingResolver> queueProcessingResolvers) {
for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers)
queueProcessingResolver.start(executor, this.queue);
return this;
}
@Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
final CompletableFuture<E> request = new CompletableFuture<>();
requestResponse.setCompletableFuture(request);
queue.add(requestResponse);
return request;
}
}

View File

@ -17,19 +17,18 @@ public class CacheResolver implements Resolver, AutoCloseable {
private final int minTtl, staleResponseTtl; private final int minTtl, staleResponseTtl;
private final long staleResponseTimeout; private final long staleResponseTimeout;
private final BlockingQueue<RequestResponse> queue; private final Resolver delegate;
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, CachedPacket> cache = new ConcurrentHashMap<>(); private final ConcurrentMap<String, CachedPacket> cache = new ConcurrentHashMap<>();
public CacheResolver(final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final int packetQueueLength, final ExecutorService executor, final ScheduledExecutorService scheduledExecutorService, public CacheResolver(final Resolver delegate, final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final ScheduledExecutorService scheduledExecutorService,
final String cacheFile, final long cacheDelayMinutes) throws IOException { final String cacheFile, final long cacheDelayMinutes) throws IOException {
this.delegate = delegate;
this.minTtl = minTtl; this.minTtl = minTtl;
this.staleResponseTtl = staleResponseTtl; this.staleResponseTtl = staleResponseTtl;
this.staleResponseTimeout = staleResponseTimeout; this.staleResponseTimeout = staleResponseTimeout;
this.queue = packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength);
this.executor = executor;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
if(cacheFile != null && !cacheFile.isEmpty()) { if(cacheFile != null && !cacheFile.isEmpty()) {
final File cacheFileFile = new File(cacheFile); final File cacheFileFile = new File(cacheFile);
@ -44,12 +43,6 @@ public class CacheResolver implements Resolver, AutoCloseable {
} }
} }
public CacheResolver startQueueProcessingResolvers(final Iterable<QueueProcessingResolver> queueProcessingResolvers) {
for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers)
queueProcessingResolver.start(this.executor, this.queue);
return this;
}
private class CachedPacket { private class CachedPacket {
final Packet response; final Packet response;
final long receivedSeconds, expiredSeconds; final long receivedSeconds, expiredSeconds;
@ -135,15 +128,8 @@ public class CacheResolver implements Resolver, AutoCloseable {
*/ */
} }
//boolean first = true;
private <E extends RequestResponse> CompletableFuture<E> requestAndCache(final E requestResponse, final Executor executor) { private <E extends RequestResponse> CompletableFuture<E> requestAndCache(final E requestResponse, final Executor executor) {
CompletableFuture<E> request = new CompletableFuture<>(); CompletableFuture<E> request = delegate.resolveAsync(requestResponse, executor);
requestResponse.setCompletableFuture(request);
//if(first) {
//first = false;
queue.add(requestResponse);
//}
if(minTtl > 0) { if(minTtl > 0) {
request = request.thenApply(s -> { request = request.thenApply(s -> {
s.getResponse().modTtls((ttl) -> Math.max(ttl, minTtl)); s.getResponse().modTtls((ttl) -> Math.max(ttl, minTtl));