RequestResponse and subclasses are no more, resolveAsync simply takes/supplies Packet like resolve

This commit is contained in:
Travis Burtrum 2019-04-14 01:43:02 -04:00
parent 62dac26b00
commit 520957ce9d
18 changed files with 74 additions and 201 deletions

View File

@ -1,13 +1,10 @@
package com.moparisthebest.dns.listen; package com.moparisthebest.dns.listen;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Base64; import java.util.Base64;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import com.moparisthebest.dns.dto.Packet; import com.moparisthebest.dns.dto.Packet;
import com.moparisthebest.dns.resolve.BaseRequestResponse;
import com.moparisthebest.dns.resolve.Resolver; import com.moparisthebest.dns.resolve.Resolver;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;

View File

@ -36,14 +36,13 @@ public class TcpAsync implements Listener {
this.local = local; this.local = local;
dnsRequestRead = new FullReadCompletionHandler() { dnsRequestRead = new FullReadCompletionHandler() {
@Override @Override
public void completed(final BufChan bufChan) { public void completed(final BufChan bc) {
try { try {
bufChan.buf.flip(); bc.buf.flip();
bufChan.setRequest(new Packet(bufChan.buf)); //debugPacket(new Packet(bc.buf).getBuf());
//debugPacket(bufChan.getRequest().getBuf());
resolver.resolveAsync(bufChan, executor).whenCompleteAsync((bc, t) -> { resolver.resolveAsync(new Packet(bc.buf), executor).whenCompleteAsync((response, t) -> {
//System.out.println("got completed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); //System.out.println("got completed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
if(t != null) { if(t != null) {
t.printStackTrace(); t.printStackTrace();
@ -52,13 +51,13 @@ public class TcpAsync implements Listener {
//debugPacket(bc.getResponse().getBuf()); //debugPacket(bc.getResponse().getBuf());
bc.tcpHead.clear(); bc.tcpHead.clear();
bc.tcpHead.putShort((short) bc.getResponse().getBuf().capacity()); bc.tcpHead.putShort((short) response.getBuf().capacity());
bc.tcpHead.rewind(); bc.tcpHead.rewind();
bc.buf = bc.tcpHead; bc.buf = bc.tcpHead;
bc.write((FullWriteCompletionHandler) (bc2) -> { bc.write((FullWriteCompletionHandler) (bc2) -> {
//System.out.println("header write complete"); //System.out.println("header write complete");
bc2.buf = bc2.getResponse().getBuf(); bc2.buf = response.getBuf();
bc2.buf.rewind(); bc2.buf.rewind();
bc2.write((FullWriteCompletionHandler) (unused) -> { bc2.write((FullWriteCompletionHandler) (unused) -> {
//System.out.println("body write complete"); //System.out.println("body write complete");
@ -69,7 +68,7 @@ public class TcpAsync implements Listener {
e.printStackTrace(); e.printStackTrace();
} }
BufChan.forTcp(bufChan.sock).read(dnsSizeRead); BufChan.forTcp(bc.sock).read(dnsSizeRead);
} }
}; };
dnsSizeRead = bc -> { dnsSizeRead = bc -> {

View File

@ -37,12 +37,12 @@ public class UdpSync implements Listener {
ss.receive(request); ss.receive(request);
//System.out.println("got packet"); //System.out.println("got packet");
final UdpRequestResponse requestResponse = new UdpRequestResponse(request.getSocketAddress(), final SocketAddress requester = request.getSocketAddress();
new Packet(ByteBuffer.wrap(request.getData(), request.getOffset(), request.getLength()).slice())); final Packet requestPacket = new Packet(ByteBuffer.wrap(request.getData(), request.getOffset(), request.getLength()).slice());
//System.out.println(requestResponse); //System.out.println(requestResponse);
//debugPacket(requestResponse.getRequest().getBuf()); //debugPacket(requestResponse.getRequest().getBuf());
resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> { resolver.resolveAsync(requestPacket, executor).whenCompleteAsync((resp, t) -> {
if(t != null) { if(t != null) {
t.printStackTrace(); t.printStackTrace();
return; return;
@ -50,10 +50,10 @@ public class UdpSync implements Listener {
//debugPacket(urr.getResponse().getBuf()); //debugPacket(urr.getResponse().getBuf());
//System.out.println("got response"); //System.out.println("got response");
final byte[] response = urr.getResponse().getBuf().array(); final byte[] response = resp.getBuf().array();
final DatagramPacket responsePacket = new DatagramPacket(response, response.length); // todo: always exact length? meh final DatagramPacket responsePacket = new DatagramPacket(response, response.length); // todo: always exact length? meh
responsePacket.setSocketAddress(urr.getRequester()); responsePacket.setSocketAddress(requester);
try { try {
ss.send(responsePacket); ss.send(responsePacket);

View File

@ -1,15 +1,10 @@
package com.moparisthebest.dns.net; package com.moparisthebest.dns.net;
import com.moparisthebest.dns.dto.Packet;
import com.moparisthebest.dns.resolve.BaseRequestResponse;
import com.moparisthebest.dns.resolve.RequestResponse;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
public class BufChan extends BaseRequestResponse { public class BufChan {
public final ByteBuffer tcpHead; public final ByteBuffer tcpHead;
public final AsynchronousSocketChannel sock; public final AsynchronousSocketChannel sock;

View File

@ -1,27 +0,0 @@
package com.moparisthebest.dns.net;
import com.moparisthebest.dns.dto.Packet;
import com.moparisthebest.dns.resolve.BaseRequestResponse;
import java.net.SocketAddress;
public class UdpRequestResponse extends BaseRequestResponse {
private final SocketAddress requester;
public UdpRequestResponse(final SocketAddress requester, final Packet request) {
super(request);
this.requester = requester;
}
public SocketAddress getRequester() {
return requester;
}
@Override
public String toString() {
return "UdpRequestResponse{" +
"requester=" + requester +
"} " + super.toString();
}
}

View File

@ -1,47 +0,0 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
public class BaseRequestResponse implements RequestResponse {
private Packet request, response;
public BaseRequestResponse() {
}
public BaseRequestResponse(final Packet request) {
this.request = request;
}
@Override
public Packet getRequest() {
return request;
}
public void setRequest(final Packet request) {
this.request = request;
}
@Override
public Packet getResponse() {
return response;
}
@Override
public void setResponse(final Packet response) {
this.response = response;
}
@Override
public String toString() {
return "BaseRequestResponse{" +
"request=" + request +
", response=" + response +
'}';
}
}

View File

@ -1,6 +1,7 @@
package com.moparisthebest.dns.resolve; package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.Util; import com.moparisthebest.dns.Util;
import com.moparisthebest.dns.dto.Packet;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -9,11 +10,11 @@ import java.util.stream.Collectors;
public class BlockingQueueResolver implements MultiResolver { public class BlockingQueueResolver implements MultiResolver {
private final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue; private final BlockingQueue<RequestCompletableFuture> queue;
private final List<QueueProcessingResolver> queueProcessingResolvers; private final List<QueueProcessingResolver> queueProcessingResolvers;
public BlockingQueueResolver(final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue, final ExecutorService executor, final Collection<Resolver> delegates) { public BlockingQueueResolver(final BlockingQueue<RequestCompletableFuture> queue, final ExecutorService executor, final Collection<Resolver> delegates) {
this.queue = queue; this.queue = queue;
if (delegates.isEmpty()) if (delegates.isEmpty())
throw new IllegalArgumentException("must supply at least 1 resolver"); throw new IllegalArgumentException("must supply at least 1 resolver");
@ -25,10 +26,10 @@ public class BlockingQueueResolver implements MultiResolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
final RequestResponseCompletableFuture<E> request = new RequestResponseCompletableFuture<>(requestResponse); final RequestCompletableFuture ret = new RequestCompletableFuture(request);
queue.add(request); queue.add(ret);
return request; return ret;
} }
@Override @Override

View File

@ -90,8 +90,8 @@ public class CacheResolver extends WrappingResolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
final String key = calcRequestPacketKey(requestResponse.getRequest()); final String key = calcRequestPacketKey(request);
//System.out.println("requestPacketKey: " + key); //System.out.println("requestPacketKey: " + key);
final CachedPacket response = cache.get(key); final CachedPacket response = cache.get(key);
//System.out.println("cachedPacket: " + response); //System.out.println("cachedPacket: " + response);
@ -99,20 +99,17 @@ public class CacheResolver extends WrappingResolver {
final long currentTime = currentTimeSeconds(); final long currentTime = currentTimeSeconds();
if (response.isExpired(currentTime)) { if (response.isExpired(currentTime)) {
//System.out.println("cachedPacket isExpired!"); //System.out.println("cachedPacket isExpired!");
final CompletableFuture<E> request = requestAndCache(key, requestResponse, executor); final CompletableFuture<Packet> ret = requestAndCache(key, request, executor);
final CompletableFuture<E> stale = supplyAsyncOnTimeOut(scheduledExecutorService, staleResponseTimeout, TimeUnit.MILLISECONDS, () -> { final CompletableFuture<Packet> stale = supplyAsyncOnTimeOut(scheduledExecutorService, staleResponseTimeout, TimeUnit.MILLISECONDS,
requestResponse.setResponse(response.getStaleResponse().setId(requestResponse.getRequest().getId())); () -> response.getStaleResponse().setId(request.getId()));
return requestResponse; return ret.applyToEitherAsync(stale, s -> s);
});
return request.applyToEitherAsync(stale, s -> s);
} else { } else {
//System.out.println("cachedPacket returning from cache!"); //System.out.println("cachedPacket returning from cache!");
requestResponse.setResponse(response.getResponse(currentTime).setId(requestResponse.getRequest().getId())); return CompletableFuture.completedFuture(response.getResponse(currentTime).setId(request.getId()));
return CompletableFuture.completedFuture(requestResponse);
} }
} }
//System.out.println("no cachedPacket, querying upstream!"); //System.out.println("no cachedPacket, querying upstream!");
return requestAndCache(key, requestResponse, executor); return requestAndCache(key, request, executor);
/* /*
// todo: should not have to do this, some upstreams seem to eat stuff though, figure that out, I think readTimeout fixed this // todo: should not have to do this, some upstreams seem to eat stuff though, figure that out, I think readTimeout fixed this
final CompletableFuture<E> request = requestAndCache(requestResponse); final CompletableFuture<E> request = requestAndCache(requestResponse);
@ -123,14 +120,14 @@ public class CacheResolver extends WrappingResolver {
*/ */
} }
private <E extends RequestResponse> CompletableFuture<E> requestAndCache(final String key, final E requestResponse, final Executor executor) { private CompletableFuture<Packet> requestAndCache(final String key, final Packet request, final Executor executor) {
final CompletableFuture<E> request = delegate.resolveAsync(requestResponse, executor); final CompletableFuture<Packet> ret = delegate.resolveAsync(request, executor);
request.thenAcceptAsync(s -> { ret.thenAcceptAsync(s -> {
final Packet response = s.getResponse().copy(); // todo: do we need to copy? final Packet response = s.copy(); // todo: do we need to copy?
final long currentTime = currentTimeSeconds(); final long currentTime = currentTimeSeconds();
cache.put(key, new CachedPacket(response, currentTime, currentTime + response.getLowestTtl())); cache.put(key, new CachedPacket(response, currentTime, currentTime + response.getLowestTtl()));
}, executor); }, executor);
return request; return ret;
} }
public void persistCache(final File file, final Map<String, CachedPacket> cache) throws IOException { public void persistCache(final File file, final Map<String, CachedPacket> cache) throws IOException {

View File

@ -21,11 +21,8 @@ public class MapResolver extends WrappingResolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
return delegate.resolveAsync(requestResponse, executor).thenApply(s -> { return delegate.resolveAsync(request, executor).thenApply(mapper);
s.setResponse(mapper.apply(s.getResponse()));
return s;
});
} }
@Override @Override

View File

@ -3,18 +3,17 @@ package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet; import com.moparisthebest.dns.dto.Packet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
public class QueueProcessingResolver extends WrappingResolver implements Runnable { public class QueueProcessingResolver extends WrappingResolver implements Runnable {
private final ExecutorService executor; private final ExecutorService executor;
private final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue; private final BlockingQueue<RequestCompletableFuture> queue;
private boolean running = false; private boolean running = false;
private Thread thisThread = null; private Thread thisThread = null;
public QueueProcessingResolver(final Resolver delegate, final ExecutorService executor, final BlockingQueue<RequestResponseCompletableFuture<? extends RequestResponse>> queue) { public QueueProcessingResolver(final Resolver delegate, final ExecutorService executor, final BlockingQueue<RequestCompletableFuture> queue) {
super(delegate); super(delegate);
this.executor = executor; this.executor = executor;
this.queue = queue; this.queue = queue;
@ -28,14 +27,12 @@ public class QueueProcessingResolver extends WrappingResolver implements Runnabl
if (running) if (running)
try { try {
//System.err.println(name + " getting from queue"); //System.err.println(name + " getting from queue");
@SuppressWarnings("unchecked") final RequestCompletableFuture cf = queue.take();
final RequestResponseCompletableFuture<RequestResponse> cf = (RequestResponseCompletableFuture<RequestResponse>) queue.take();
final RequestResponse requestResponse = cf.getRequestResponse();
//System.err.println(name + " got from queue"); //System.err.println(name + " got from queue");
Packet response = null; Packet response = null;
Throwable resolveException = null; Throwable resolveException = null;
try { try {
response = resolve(requestResponse.getRequest()); response = resolve(cf.getRequest());
} catch (Throwable e) { } catch (Throwable e) {
//e.printStackTrace(); //e.printStackTrace();
//System.err.println("FAILURE: " + name + ": " + e.getMessage()); //System.err.println("FAILURE: " + name + ": " + e.getMessage());
@ -47,10 +44,9 @@ public class QueueProcessingResolver extends WrappingResolver implements Runnabl
// failed // failed
cf.completeExceptionally(resolveException == null ? new Exception("SRVFAIL") : resolveException); cf.completeExceptionally(resolveException == null ? new Exception("SRVFAIL") : resolveException);
} else { } else {
requestResponse.setResponse(response); //System.err.println(name + " got response: " + response);
//System.err.println(name + " got response: " + requestResponse.getResponse()); //System.err.println(name + " completed: " + cf.complete(response));
//System.err.println(name + " completed: " + cf.complete(requestResponse)); cf.complete(response);
cf.complete(requestResponse);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("socketresolver take", e); throw new RuntimeException("socketresolver take", e);

View File

@ -24,8 +24,8 @@ public class RandomUpstreamResolver implements MultiResolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
return random().resolveAsync(requestResponse, executor); return random().resolveAsync(request, executor);
} }
@Override @Override

View File

@ -0,0 +1,18 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.concurrent.CompletableFuture;
public class RequestCompletableFuture extends CompletableFuture<Packet> {
private final Packet request;
public RequestCompletableFuture(final Packet request) {
this.request = request;
}
public Packet getRequest() {
return request;
}
}

View File

@ -1,11 +0,0 @@
package com.moparisthebest.dns.resolve;
import com.moparisthebest.dns.dto.Packet;
import java.util.concurrent.CompletableFuture;
public interface RequestResponse {
Packet getRequest();
Packet getResponse();
void setResponse(Packet response);
}

View File

@ -1,16 +0,0 @@
package com.moparisthebest.dns.resolve;
import java.util.concurrent.CompletableFuture;
public class RequestResponseCompletableFuture<E extends RequestResponse> extends CompletableFuture<E> {
private final E requestResponse;
public RequestResponseCompletableFuture(final E requestResponse) {
this.requestResponse = requestResponse;
}
public E getRequestResponse() {
return requestResponse;
}
}

View File

@ -17,17 +17,15 @@ public interface Resolver extends AutoCloseable {
/** /**
* This must return immediately and resolve the DNS query in the background, using the given executor * This must return immediately and resolve the DNS query in the background, using the given executor
* @param requestResponse * @param request
* @param executor * @param executor
* @param <E>
* @return * @return
*/ */
default <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { default CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
final CompletableFuture<E> ret = new CompletableFuture<>(); final CompletableFuture<Packet> ret = new CompletableFuture<>();
executor.execute(() -> { executor.execute(() -> {
try { try {
requestResponse.setResponse(resolve(requestResponse.getRequest())); ret.complete(resolve(request));
ret.complete(requestResponse);
} catch (Throwable e) { } catch (Throwable e) {
ret.completeExceptionally(e); ret.completeExceptionally(e);
} }
@ -42,7 +40,7 @@ public interface Resolver extends AutoCloseable {
* @throws Exception * @throws Exception
*/ */
default Packet resolve(final Packet request) throws Exception { default Packet resolve(final Packet request) throws Exception {
return resolveAsync(new BaseRequestResponse(request), Runnable::run).get().getResponse(); return resolveAsync(request, Runnable::run).get();
} }
default void close() { default void close() {

View File

@ -22,13 +22,12 @@ public class RetryResolver extends WrappingResolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
// todo: better async way to do this? // todo: better async way to do this?
final CompletableFuture<E> ret = new CompletableFuture<>(); final CompletableFuture<Packet> ret = new CompletableFuture<>();
executor.execute(() -> { executor.execute(() -> {
try { try {
requestResponse.setResponse(resolve(requestResponse.getRequest())); ret.complete(resolve(request));
ret.complete(requestResponse);
} catch (Throwable e) { } catch (Throwable e) {
ret.completeExceptionally(e); ret.completeExceptionally(e);
} }

View File

@ -16,8 +16,8 @@ public abstract class WrappingResolver implements Resolver {
} }
@Override @Override
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) { public CompletableFuture<Packet> resolveAsync(final Packet request, final Executor executor) {
return delegate.resolveAsync(requestResponse, executor); return delegate.resolveAsync(request, executor);
} }
@Override @Override

View File

@ -2,7 +2,6 @@ package com.moparisthebest.dns.listen;
import com.moparisthebest.dns.dto.Packet; import com.moparisthebest.dns.dto.Packet;
import com.moparisthebest.dns.net.ParsedUrl; import com.moparisthebest.dns.net.ParsedUrl;
import com.moparisthebest.dns.resolve.BaseRequestResponse;
import com.moparisthebest.dns.resolve.Resolver; import com.moparisthebest.dns.resolve.Resolver;
import com.moparisthebest.dns.xmpp.ConnectionDetails; import com.moparisthebest.dns.xmpp.ConnectionDetails;
import com.moparisthebest.dns.xmpp.DnsIq; import com.moparisthebest.dns.xmpp.DnsIq;
@ -49,16 +48,15 @@ public class XmppListener implements Listener {
final byte[] request = DnsIq.parseDnsIq(req); final byte[] request = DnsIq.parseDnsIq(req);
if (request != null) { if (request != null) {
//System.out.println("good request: " + req); //System.out.println("good request: " + req);
final XmppRequestResponse requestResponse = new XmppRequestResponse(req.getFrom(), new Packet(request));
resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> { resolver.resolveAsync(new Packet(request), executor).whenCompleteAsync((urr, t) -> {
if (t != null) { if (t != null) {
t.printStackTrace(); t.printStackTrace();
return; return;
} }
//debugPacket(urr.getResponse().getBuf()); //debugPacket(urr.getResponse().getBuf());
final IQ resp = DnsIq.responseFor(req, urr.getResponse().getBuf()); final IQ resp = DnsIq.responseFor(req, urr.getBuf());
try { try {
//System.out.println("dns response: " + resp.toString()); //System.out.println("dns response: " + resp.toString());
@ -129,25 +127,4 @@ public class XmppListener implements Listener {
if (thisThread != null) if (thisThread != null)
thisThread.interrupt(); thisThread.interrupt();
} }
public class XmppRequestResponse extends BaseRequestResponse {
private final Jid requester;
public XmppRequestResponse(final Jid requester, final Packet request) {
super(request);
this.requester = requester;
}
public Jid getRequester() {
return requester;
}
@Override
public String toString() {
return "XmppRequestResponse{" +
"requester=" + requester +
"} " + super.toString();
}
}
} }