Implement persistant caching to disk on an interval

This commit is contained in:
Travis Burtrum 2018-03-10 01:10:25 -05:00
parent b45ffed48a
commit 0a9c4405f1
6 changed files with 81 additions and 9 deletions

View File

@ -6,6 +6,11 @@ staleResponseTimeout=1000
# staleResponseTtl: TTL to apply to stale record when above timeout is met and stale record is served, default 10
staleResponseTtl=10
# cacheFile: path to file to persist cache to at an interval
cacheFile=dnscache.map
# cacheDelayMinutes: how often to write the cache to disk
cacheDelayMinutes=60
# packetQueueLength: maximum requests queued waiting for responses from upstream, all resolvers specified process from this queue, cached responses don't enter this queue, default 100, 0 means unlimited
packetQueueLength=100

View File

@ -40,6 +40,9 @@ public class DnsProxy {
final long staleResponseTimeout = Long.parseLong(config.getOrDefault("staleResponseTimeout", "1000"));
final int packetQueueLength = Integer.parseInt(config.getOrDefault("packetQueueLength", "100"));
final String cacheFile = config.get("cacheFile");
final long cacheDelayMinutes = Long.parseLong(config.getOrDefault("cacheDelayMinutes", "60"));
final String[] resolvers = config.getOrDefault("resolvers", "https://dns.google.com/experimental?ct#name=dns.google.com").split("\\s+");
if (!config.containsKey("maxRetries"))
config.put("maxRetries", String.valueOf(resolvers.length * 2));
@ -54,7 +57,8 @@ public class DnsProxy {
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(40);
final ExecutorService executor = scheduledExecutorService;//ForkJoinPool.commonPool();
final CacheResolver resolver = new CacheResolver(minTtl, staleResponseTtl, staleResponseTimeout, packetQueueLength, executor, scheduledExecutorService)
final CacheResolver resolver = new CacheResolver(minTtl, staleResponseTtl, staleResponseTimeout, packetQueueLength,
executor, scheduledExecutorService, cacheFile, cacheDelayMinutes)
.startQueueProcessingResolvers(queueProcessingResolvers);
final List<Listener> listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+"))

View File

@ -28,7 +28,7 @@ public class Packet extends AbstractBufferWindow {
throw new RuntimeException("header too short");
}
private Packet(final ByteBuffer buf, final int start, final int end) {
public Packet(final ByteBuffer buf, final int start, final int end) {
super(buf, start);
this.end = end;
}
@ -235,11 +235,20 @@ public class Packet extends AbstractBufferWindow {
public Packet copy() {
final ByteBuffer copy = ByteBuffer.allocate(getEnd() - start);
final ByteBuffer buf = this.buf.duplicate();
buf.position(start);
copy.put(buf);
return new Packet(copy, start, end);
}
public byte[] copyRaw() {
final byte[] copy = new byte[getEnd() - start];
final ByteBuffer buf = this.buf.duplicate();
buf.position(start);
buf.get(copy);
return copy;
}
public String getDohBase64() {
// todo: remove trailing equals, this goes outside limit...
return Base64.getUrlEncoder().encodeToString(getBuf().array());

View File

@ -12,7 +12,7 @@ public class BaseRequestResponse implements RequestResponse {
private Packet request, response;
private CompletableFuture<? extends RequestResponse> completableFuture;
private Object requestPacketKey;
private String requestPacketKey;
private int failureCount;
public BaseRequestResponse() {
@ -52,12 +52,12 @@ public class BaseRequestResponse implements RequestResponse {
}
@Override
public Object getRequestPacketKey() {
public String getRequestPacketKey() {
return requestPacketKey;
}
@Override
public void setRequestPacketKey(final Object requestPacketKey) {
public void setRequestPacketKey(final String requestPacketKey) {
this.requestPacketKey = requestPacketKey;
}

View File

@ -3,6 +3,11 @@ package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import com.moparisthebest.dns.dto.Question;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.concurrent.*;
import static com.moparisthebest.dns.Util.supplyAsyncOnTimeOut;
@ -16,15 +21,27 @@ public class CacheResolver implements Resolver, AutoCloseable {
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<Object, CachedPacket> cache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, CachedPacket> cache = new ConcurrentHashMap<>();
public CacheResolver(final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final int packetQueueLength, final ExecutorService executor, final ScheduledExecutorService scheduledExecutorService) {
public CacheResolver(final int minTtl, final int staleResponseTtl, final long staleResponseTimeout, final int packetQueueLength, final ExecutorService executor, final ScheduledExecutorService scheduledExecutorService,
final String cacheFile, final long cacheDelayMinutes) throws IOException {
this.minTtl = minTtl;
this.staleResponseTtl = staleResponseTtl;
this.staleResponseTimeout = staleResponseTimeout;
this.queue = packetQueueLength < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(packetQueueLength);
this.executor = executor;
this.scheduledExecutorService = scheduledExecutorService;
if(cacheFile != null && !cacheFile.isEmpty()) {
final File cacheFileFile = new File(cacheFile);
readCache(cacheFileFile, cache);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
persistCache(cacheFileFile, cache);
} catch (IOException e) {
e.printStackTrace();
}
}, cacheDelayMinutes, cacheDelayMinutes, TimeUnit.MINUTES);
}
}
public CacheResolver startQueueProcessingResolvers(final Iterable<QueueProcessingResolver> queueProcessingResolvers) {
@ -151,4 +168,41 @@ public class CacheResolver implements Resolver, AutoCloseable {
public Packet resolve(final Packet request) throws Exception {
return resolveAsync(new BaseRequestResponse(request)).get().getResponse();
}
public void persistCache(final File file, final Map<String, CachedPacket> cache) throws IOException {
final File tmpFile = new File(file.getAbsolutePath() + ".tmp");
try(FileOutputStream fos = new FileOutputStream(tmpFile);
DataOutputStream dos = new DataOutputStream(fos)) {
for(final Map.Entry<String, CachedPacket> entry : cache.entrySet()) {
dos.writeUTF(entry.getKey());
final CachedPacket cp = entry.getValue();
final byte[] rawCopy = cp.response.copyRaw();
dos.writeInt(rawCopy.length);
dos.write(rawCopy);
dos.writeLong(cp.receivedSeconds);
dos.writeLong(cp.expiredSeconds);
}
}
// after the file is fully written, move it into place, should be atomic
Files.move(tmpFile.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
public void readCache(final File file, final Map<String, CachedPacket> cache) throws IOException {
if(file.exists())
try(FileInputStream fis = new FileInputStream(file);
DataInputStream dis = new DataInputStream(fis)) {
final String key = dis.readUTF();
final byte[] packet = new byte[dis.readInt()];
dis.readFully(packet);
cache.put(key, new CachedPacket(
new Packet(ByteBuffer.wrap(packet), 0, packet.length),
dis.readLong(), dis.readLong()));
} catch(EOFException e) {
// ignore this, we just hit end of file
}
}
}

View File

@ -18,7 +18,7 @@ public interface RequestResponse {
*/
CompletableFuture<? extends RequestResponse> getCompletableFuture();
void setCompletableFuture(CompletableFuture<? extends RequestResponse> completableFuture);
Object getRequestPacketKey();
void setRequestPacketKey(Object key);
String getRequestPacketKey();
void setRequestPacketKey(String key);
int getAndIncrementFailureCount();
}