First major refactor

This commit is contained in:
Travis Burtrum 2019-04-14 00:55:55 -04:00
parent c7d8712ab5
commit fa9e2b3048
13 changed files with 194 additions and 164 deletions

View File

@ -48,25 +48,29 @@ public class DnsProxy {
final String cacheFile = config.get("cacheFile");
final long cacheDelayMinutes = Long.parseLong(config.getOrDefault("cacheDelayMinutes", "60"));
final String[] resolvers = config.getOrDefault("resolvers", "https://dns.google.com/experimental?ct#name=dns.google.com").split("\\s+");
if (!config.containsKey("maxRetries"))
config.put("maxRetries", String.valueOf(resolvers.length * 2));
final String[] resolverUrls = config.getOrDefault("resolvers", "https://dns.google.com/experimental?ct#name=dns.google.com").split("\\s+");
//System.out.println("resolvers: " + Arrays.toString(resolvers));
final int maxRetries = Integer.parseInt(config.getOrDefault("maxRetries", String.valueOf(resolverUrls.length * 2)));
final List<QueueProcessingResolver> queueProcessingResolvers = Arrays.stream(resolvers).map(s -> ParsedUrl.of(s, config)).map(QueueProcessingResolver::of).collect(Collectors.toList());
//final List<QueueProcessingResolver> queueProcessingResolvers = new ArrayList<>();
//queueProcessingResolvers.add(new SocketResolver(5, "socket1", SocketFactory.getDefault(), new InetSocketAddress("8.8.4.4", 53)));
//queueProcessingResolvers.add(new HttpResolver(5, "http1", "https://dns.google.com/experimental?ct"));
//System.out.println("resolverUrls: " + Arrays.toString(resolverUrls));
final List<Resolver> resolvers = Arrays.stream(resolverUrls).map(s -> ParsedUrl.of(s, config)).map(Resolver::of).collect(Collectors.toList());
//final List<QueueProcessingResolver> resolvers = new ArrayList<>();
//resolvers.add(new SocketResolver(5, "socket1", SocketFactory.getDefault(), new InetSocketAddress("8.8.4.4", 53)));
//resolvers.add(new HttpResolver(5, "http1", "https://dns.google.com/experimental?ct"));
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40);
final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool();
/*
final Resolver multiResolver = MultiResolver.of(packetQueueLength, executor, resolvers);
final Resolver resolver = RetryResolver.of(maxRetries, multiResolver);
*/
final Resolver multiResolver = MultiResolver.of(packetQueueLength, executor, resolvers);
final CacheResolver resolver = new CacheResolver(
MapResolver.minTtl(minTtl,
new BlockingQueueResolver(packetQueueLength)
.startQueueProcessingResolvers(executor, queueProcessingResolvers)
),
MapResolver.minTtl(minTtl, RetryResolver.of(maxRetries, multiResolver)),
staleResponseTtl, staleResponseTimeout,
scheduledExecutorService, cacheFile, cacheDelayMinutes)
;
@ -84,9 +88,10 @@ public class DnsProxy {
//if(true) return;
executor.shutdown();
scheduledExecutorService.shutdown();
queueProcessingResolvers.forEach(Util::tryClose);
listeners.forEach(Util::tryClose);
tryClose(resolver);
tryClose(multiResolver);
resolvers.forEach(Util::tryClose);
System.out.println("shutdown complete");
});

View File

@ -1,80 +0,0 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public abstract class AbstractQueueProcessingResolver implements QueueProcessingResolver {
protected final int maxRetries;
protected final String name;
protected ExecutorService executor;
protected BlockingQueue<RequestResponse> queue;
private boolean running = false;
private Thread thisThread = null;
public AbstractQueueProcessingResolver(final int maxRetries, final String name) {
this.maxRetries = maxRetries;
this.name = name;
}
@Override
public void start(final ExecutorService executor, final BlockingQueue<RequestResponse> queue) {
this.executor = executor;
this.queue = queue;
this.running = true;
executor.execute(this);
}
@Override
public void run() {
thisThread = Thread.currentThread();
if (running)
try {
//System.err.println(name + " getting from queue");
final RequestResponse requestResponse = queue.take();
//System.err.println(name + " got from queue");
Packet response = null;
try {
response = resolve(requestResponse.getRequest());
} catch (Exception e) {
//e.printStackTrace();
System.err.println("FAILURE: " + name + ": " + e.getMessage());
}
if(response == null) {
// failed
if (requestResponse.getAndIncrementFailureCount() < maxRetries) {
//System.err.println(name + " putting in queue");
queue.put(requestResponse);
//System.err.println(name + " put in queue");
} else {
//System.err.println(name + " maxRetries reached SRVFAIL");
@SuppressWarnings("unchecked") final CompletableFuture<RequestResponse> cf = (CompletableFuture<RequestResponse>) requestResponse.getCompletableFuture();
cf.completeExceptionally(new Exception("SRVFAIL"));
}
} else {
requestResponse.setResponse(response);
//System.err.println(name + " got response: " + requestResponse.getResponse());
@SuppressWarnings("unchecked") final CompletableFuture<RequestResponse> cf = (CompletableFuture<RequestResponse>) requestResponse.getCompletableFuture();
//System.err.println(name + " completed: " + cf.complete(requestResponse));
cf.complete(requestResponse);
}
} catch (InterruptedException e) {
throw new RuntimeException("socketresolver take", e);
} finally {
if (running)
executor.execute(this);
}
}
@Override
public void close() {
running = false;
if (thisThread != null)
thisThread.interrupt();
}
}

View File

@ -11,9 +11,7 @@ public class BaseRequestResponse implements RequestResponse {
private Packet request, response;
private CompletableFuture<? extends RequestResponse> completableFuture;
private String requestPacketKey;
private int failureCount;
public BaseRequestResponse() {
}
@ -41,16 +39,6 @@ public class BaseRequestResponse implements RequestResponse {
this.response = response;
}
@Override
public CompletableFuture<? extends RequestResponse> getCompletableFuture() {
return completableFuture;
}
@Override
public void setCompletableFuture(final CompletableFuture<? extends RequestResponse> completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public String getRequestPacketKey() {
return requestPacketKey;
@ -61,19 +49,12 @@ public class BaseRequestResponse implements RequestResponse {
this.requestPacketKey = requestPacketKey;
}
@Override
public final int getAndIncrementFailureCount() {
return ++failureCount;
}
@Override
public String toString() {
return "BaseRequestResponse{" +
"request=" + request +
", response=" + response +
", completableFuture=" + completableFuture +
", requestPacketKey=" + requestPacketKey +
", failureCount=" + failureCount +
'}';
}
}

View File

@ -1,30 +1,38 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.Util;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BlockingQueueResolver implements Resolver {
public class BlockingQueueResolver implements MultiResolver {
private final BlockingQueue<RequestResponse> queue;
private final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue;
public BlockingQueueResolver(final BlockingQueue<RequestResponse> queue) {
private final List<QueueProcessingResolver> queueProcessingResolvers;
public BlockingQueueResolver(final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue, final ExecutorService executor, final Collection<Resolver> delegates) {
this.queue = queue;
if (delegates.isEmpty())
throw new IllegalArgumentException("must supply at least 1 resolver");
queueProcessingResolvers = delegates.stream().map(resolver -> new QueueProcessingResolver(resolver, executor, this.queue)).collect(Collectors.toList());
}
public BlockingQueueResolver(final int packetQueueLength) {
this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength));
}
public Resolver startQueueProcessingResolvers(final ExecutorService executor, final Iterable<QueueProcessingResolver> queueProcessingResolvers) {
for(final QueueProcessingResolver queueProcessingResolver : queueProcessingResolvers)
queueProcessingResolver.start(executor, this.queue);
return this;
public BlockingQueueResolver(final int packetQueueLength, final ExecutorService executor, final Collection<Resolver> delegates) {
this(packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength), executor, delegates);
}
@Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
final CompletableFuture<E> request = new CompletableFuture<>();
requestResponse.setCompletableFuture(request);
queue.add(requestResponse);
final RequestResponseCompletableFuture<E> request = new RequestResponseCompletableFuture<>(requestResponse);
queue.add(request);
return request;
}
@Override
public void close() {
queueProcessingResolvers.forEach(Util::tryClose);
}
}

View File

@ -12,20 +12,18 @@ import java.util.concurrent.*;
import static com.moparisthebest.dns.Util.supplyAsyncOnTimeOut;
public class CacheResolver implements Resolver, AutoCloseable {
public class CacheResolver extends WrappingResolver {
private final int staleResponseTtl;
private final long staleResponseTimeout;
private final Resolver delegate;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, CachedPacket> cache = new ConcurrentHashMap<>();
public CacheResolver(final Resolver delegate, final int staleResponseTtl, final long staleResponseTimeout, final ScheduledExecutorService scheduledExecutorService,
final String cacheFile, final long cacheDelayMinutes) throws IOException {
this.delegate = delegate;
super(delegate);
this.staleResponseTtl = staleResponseTtl;
this.staleResponseTimeout = staleResponseTimeout;
this.scheduledExecutorService = scheduledExecutorService;

View File

@ -6,9 +6,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
public class MapResolver implements Resolver {
public class MapResolver extends WrappingResolver {
private final Resolver delegate;
private final Function<? super Packet, ? extends Packet> mapper;
public static Resolver minTtl(final int minTtl, final Resolver delegate) {
@ -17,7 +16,7 @@ public class MapResolver implements Resolver {
}
private MapResolver(final Resolver delegate, final Function<? super Packet, ? extends Packet> mapper) {
this.delegate = delegate;
super(delegate);
this.mapper = mapper;
}

View File

@ -0,0 +1,13 @@
package com.moparisthebest.dns.resolve;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
public interface MultiResolver extends Resolver {
public static MultiResolver of(final int packetQueueLength, final ExecutorService executor, final Collection<Resolver> delegates) {
return packetQueueLength < 0 ?
new RandomUpstreamResolver(delegates) :
new BlockingQueueResolver(packetQueueLength, executor, delegates);
}
}

View File

@ -1,27 +1,69 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.net.ParsedUrl;
import com.moparisthebest.dns.dto.Packet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public interface QueueProcessingResolver extends Resolver, Runnable, AutoCloseable {
void start(final ExecutorService executor, final BlockingQueue<RequestResponse> queue);
public class QueueProcessingResolver extends WrappingResolver implements Runnable {
public static QueueProcessingResolver of(final String resolver, final Map<String, String> upperLevelProps) {
return of(ParsedUrl.of(resolver, upperLevelProps));
private final ExecutorService executor;
private final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue;
private boolean running = false;
private Thread thisThread = null;
public QueueProcessingResolver(final Resolver delegate, final ExecutorService executor, final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue) {
super(delegate);
this.executor = executor;
this.queue = queue;
this.running = true;
executor.execute(this);
}
public static QueueProcessingResolver of(final String resolver) {
return of(ParsedUrl.of(resolver));
@Override
public void run() {
thisThread = Thread.currentThread();
if (running)
try {
//System.err.println(name + " getting from queue");
@SuppressWarnings("unchecked")
final RequestResponseCompletableFuture<RequestResponse> cf = (RequestResponseCompletableFuture<RequestResponse>) queue.take();
final RequestResponse requestResponse = cf.getRequestResponse();
//System.err.println(name + " got from queue");
Packet response = null;
Throwable resolveException = null;
try {
response = resolve(requestResponse.getRequest());
} catch (Throwable e) {
//e.printStackTrace();
//System.err.println("FAILURE: " + name + ": " + e.getMessage());
resolveException = e;
}
//resolveAsync(requestResponse, executor).get();
if(response == null) {
// failed
cf.completeExceptionally(resolveException == null ? new Exception("SRVFAIL") : resolveException);
} else {
requestResponse.setResponse(response);
//System.err.println(name + " got response: " + requestResponse.getResponse());
//System.err.println(name + " completed: " + cf.complete(requestResponse));
cf.complete(requestResponse);
}
} catch (InterruptedException e) {
throw new RuntimeException("socketresolver take", e);
} finally {
if (running)
executor.execute(this);
}
}
public static QueueProcessingResolver of(final ParsedUrl parsedUrl) {
final int maxRetries = Integer.parseInt(parsedUrl.getProps().getOrDefault("maxRetries", "5"));
String name = parsedUrl.getProps().get("name");
if(name == null)
name = parsedUrl.getUri().toString();
return new DelegatingQueueProcessingResolver(maxRetries, name, Resolver.of(parsedUrl));
@Override
public void close() {
running = false;
if (thisThread != null)
thisThread.interrupt();
}
}

View File

@ -2,11 +2,12 @@ package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
public class RandomUpstreamResolver implements Resolver {
public class RandomUpstreamResolver implements MultiResolver {
private final Resolver[] delegates;
@ -14,6 +15,10 @@ public class RandomUpstreamResolver implements Resolver {
this.delegates = delegates;
}
public RandomUpstreamResolver(final Collection<Resolver> delegates) {
this.delegates = delegates.toArray(new Resolver[0]);
}
public Resolver random() {
return delegates[ThreadLocalRandom.current().nextInt(delegates.length)];
}

View File

@ -11,14 +11,7 @@ public interface RequestResponse {
/**
* These should only be used by resolvers, may be null
CompletableFuture<RequestResponse> getCompletableFuture();
void setCompletableFuture(CompletableFuture<RequestResponse> completableFuture);
<E extends RequestResponse> CompletableFuture<E> getCompletableFuture();
<E extends RequestResponse> void setCompletableFuture(CompletableFuture<E> completableFuture);
*/
CompletableFuture<? extends RequestResponse> getCompletableFuture();
void setCompletableFuture(CompletableFuture<? extends RequestResponse> completableFuture);
String getRequestPacketKey();
void setRequestPacketKey(String key);
int getAndIncrementFailureCount();
}

View File

@ -0,0 +1,16 @@
package com.moparisthebest.dns.resolve;
import java.util.concurrent.CompletableFuture;
public class RequestResponseCompletableFuture<E extends RequestResponse> extends CompletableFuture<E> {
private final E requestResponse;
public RequestResponseCompletableFuture(final E requestResponse) {
this.requestResponse = requestResponse;
}
public E getRequestResponse() {
return requestResponse;
}
}

View File

@ -0,0 +1,53 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class RetryResolver extends WrappingResolver {
protected final int maxRetries;
public static Resolver of(final int maxRetries, final Resolver delegate) {
// anything less than 1 just don't wrap
return maxRetries < 1 ? delegate : new RetryResolver(delegate, maxRetries);
}
private RetryResolver(final Resolver delegate, final int maxRetries) {
super(delegate);
this.maxRetries = maxRetries;
}
@Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
// todo: better async way to do this?
final CompletableFuture<E> ret = new CompletableFuture<>();
executor.execute(() -> {
try {
requestResponse.setResponse(resolve(requestResponse.getRequest()));
ret.complete(requestResponse);
} catch (Throwable e) {
ret.completeExceptionally(e);
}
});
return ret;
}
@Override
public Packet resolve(final Packet request) throws Exception {
for(int x = 0; x < maxRetries; ++x) {
try {
final Packet response = super.resolve(request);
if(response != null)
return response;
} catch (Exception e) {
e.printStackTrace();
//System.err.println("FAILURE: " + name + ": " + e.getMessage());
}
}
throw new Exception("SRVFAIL");
}
}

View File

@ -2,15 +2,16 @@ package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingResolver {
public abstract class WrappingResolver implements Resolver {
private final Resolver delegate;
protected final Resolver delegate;
public DelegatingQueueProcessingResolver(final int maxRetries, final String name, final Resolver delegate) {
super(maxRetries, name);
public WrappingResolver(final Resolver delegate) {
Objects.requireNonNull(delegate);
this.delegate = delegate;
}
@ -24,9 +25,5 @@ public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingRe
return delegate.resolve(request);
}
@Override
public void close() {
super.close();
delegate.close();
}
// we don't call delegate.close() on purpose, whatever created it should close it
}