Re-factor and document Resolver
This commit is contained in:
parent
964521f0cd
commit
f52f1de8ad
@ -43,7 +43,7 @@ public class TcpAsync implements Listener {
|
|||||||
bufChan.setRequest(new Packet(bufChan.buf));
|
bufChan.setRequest(new Packet(bufChan.buf));
|
||||||
//debugPacket(bufChan.getRequest().getBuf());
|
//debugPacket(bufChan.getRequest().getBuf());
|
||||||
|
|
||||||
resolver.resolveAsync(bufChan).whenCompleteAsync((bc, t) -> {
|
resolver.resolveAsync(bufChan, executor).whenCompleteAsync((bc, t) -> {
|
||||||
//System.out.println("got completed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
//System.out.println("got completed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||||
if(t != null) {
|
if(t != null) {
|
||||||
t.printStackTrace();
|
t.printStackTrace();
|
||||||
|
@ -42,7 +42,7 @@ public class UdpSync implements Listener {
|
|||||||
//System.out.println(requestResponse);
|
//System.out.println(requestResponse);
|
||||||
//debugPacket(requestResponse.getRequest().getBuf());
|
//debugPacket(requestResponse.getRequest().getBuf());
|
||||||
|
|
||||||
resolver.resolveAsync(requestResponse).whenCompleteAsync((urr, t) -> {
|
resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> {
|
||||||
if(t != null) {
|
if(t != null) {
|
||||||
t.printStackTrace();
|
t.printStackTrace();
|
||||||
return;
|
return;
|
||||||
|
@ -105,7 +105,7 @@ public class CacheResolver implements Resolver, AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse) {
|
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
|
||||||
final String key = calcRequestPacketKey(requestResponse.getRequest());
|
final String key = calcRequestPacketKey(requestResponse.getRequest());
|
||||||
//System.out.println("requestPacketKey: " + key);
|
//System.out.println("requestPacketKey: " + key);
|
||||||
final CachedPacket response = cache.get(key);
|
final CachedPacket response = cache.get(key);
|
||||||
@ -115,7 +115,7 @@ public class CacheResolver implements Resolver, AutoCloseable {
|
|||||||
if (response.isExpired(currentTime)) {
|
if (response.isExpired(currentTime)) {
|
||||||
//System.out.println("cachedPacket isExpired!");
|
//System.out.println("cachedPacket isExpired!");
|
||||||
requestResponse.setRequestPacketKey(key);
|
requestResponse.setRequestPacketKey(key);
|
||||||
final CompletableFuture<E> request = requestAndCache(requestResponse);
|
final CompletableFuture<E> request = requestAndCache(requestResponse, executor);
|
||||||
final CompletableFuture<E> stale = supplyAsyncOnTimeOut(scheduledExecutorService, staleResponseTimeout, TimeUnit.MILLISECONDS, () -> {
|
final CompletableFuture<E> stale = supplyAsyncOnTimeOut(scheduledExecutorService, staleResponseTimeout, TimeUnit.MILLISECONDS, () -> {
|
||||||
requestResponse.setResponse(response.getStaleResponse().setId(requestResponse.getRequest().getId()));
|
requestResponse.setResponse(response.getStaleResponse().setId(requestResponse.getRequest().getId()));
|
||||||
return requestResponse;
|
return requestResponse;
|
||||||
@ -129,7 +129,7 @@ public class CacheResolver implements Resolver, AutoCloseable {
|
|||||||
}
|
}
|
||||||
//System.out.println("no cachedPacket, querying upstream!");
|
//System.out.println("no cachedPacket, querying upstream!");
|
||||||
requestResponse.setRequestPacketKey(key);
|
requestResponse.setRequestPacketKey(key);
|
||||||
return requestAndCache(requestResponse);
|
return requestAndCache(requestResponse, executor);
|
||||||
/*
|
/*
|
||||||
// todo: should not have to do this, some upstreams seem to eat stuff though, figure that out, I think readTimeout fixed this
|
// todo: should not have to do this, some upstreams seem to eat stuff though, figure that out, I think readTimeout fixed this
|
||||||
final CompletableFuture<E> request = requestAndCache(requestResponse);
|
final CompletableFuture<E> request = requestAndCache(requestResponse);
|
||||||
@ -142,7 +142,7 @@ public class CacheResolver implements Resolver, AutoCloseable {
|
|||||||
|
|
||||||
//boolean first = true;
|
//boolean first = true;
|
||||||
|
|
||||||
private <E extends RequestResponse> CompletableFuture<E> requestAndCache(final E requestResponse) {
|
private <E extends RequestResponse> CompletableFuture<E> requestAndCache(final E requestResponse, final Executor executor) {
|
||||||
CompletableFuture<E> request = new CompletableFuture<>();
|
CompletableFuture<E> request = new CompletableFuture<>();
|
||||||
requestResponse.setCompletableFuture(request);
|
requestResponse.setCompletableFuture(request);
|
||||||
//if(first) {
|
//if(first) {
|
||||||
|
@ -3,6 +3,7 @@ package com.moparisthebest.dns.resolve;
|
|||||||
import com.moparisthebest.dns.dto.Packet;
|
import com.moparisthebest.dns.dto.Packet;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingResolver {
|
public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingResolver {
|
||||||
|
|
||||||
@ -14,8 +15,8 @@ public class DelegatingQueueProcessingResolver extends AbstractQueueProcessingRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse) {
|
public <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
|
||||||
return delegate.resolveAsync(requestResponse);
|
return delegate.resolveAsync(requestResponse, executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -5,27 +5,44 @@ import com.moparisthebest.dns.net.ParsedUrl;
|
|||||||
|
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementations need to implement at least 1 of the resolve functions, either resolveAsync or resolve, the default
|
||||||
|
* implementations simply call the other one ending in a stack overflow.
|
||||||
|
*
|
||||||
|
* Ideally, implementations provide optimized versions of both the in-line and async call.
|
||||||
|
*/
|
||||||
public interface Resolver {
|
public interface Resolver {
|
||||||
default <E extends RequestResponse> CompletableFuture<E> resolveAsync(E requestResponse) {
|
|
||||||
try {
|
/**
|
||||||
requestResponse.setResponse(resolve(requestResponse.getRequest()));
|
* This must return immediately and resolve the DNS query in the background, using the given executor
|
||||||
return CompletableFuture.completedFuture(requestResponse);
|
* @param requestResponse
|
||||||
} catch (Exception e) {
|
* @param executor
|
||||||
final CompletableFuture<E> ret = new CompletableFuture<>();
|
* @param <E>
|
||||||
ret.completeExceptionally(e);
|
* @return
|
||||||
return ret;
|
*/
|
||||||
}
|
default <E extends RequestResponse> CompletableFuture<E> resolveAsync(final E requestResponse, final Executor executor) {
|
||||||
/*
|
final CompletableFuture<E> ret = new CompletableFuture<>();
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
executor.execute(() -> {
|
||||||
requestResponse.setResponse(resolve(requestResponse.getRequest()));
|
try {
|
||||||
return requestResponse;
|
requestResponse.setResponse(resolve(requestResponse.getRequest()));
|
||||||
}, executor);
|
ret.complete(requestResponse);
|
||||||
*/
|
} catch (Throwable e) {
|
||||||
|
ret.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
default Packet resolve(Packet request) throws Exception {
|
/**
|
||||||
return resolveAsync(new BaseRequestResponse(request)).get().getResponse();
|
* This must block on resolving the given query
|
||||||
|
* @param request
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
default Packet resolve(final Packet request) throws Exception {
|
||||||
|
return resolveAsync(new BaseRequestResponse(request), Runnable::run).get().getResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
ServiceLoader<Services> services = ServiceLoader.load(Services.class);
|
ServiceLoader<Services> services = ServiceLoader.load(Services.class);
|
||||||
|
@ -51,7 +51,7 @@ public class XmppListener implements Listener {
|
|||||||
//System.out.println("good request: " + req);
|
//System.out.println("good request: " + req);
|
||||||
final XmppRequestResponse requestResponse = new XmppRequestResponse(req.getFrom(), new Packet(request));
|
final XmppRequestResponse requestResponse = new XmppRequestResponse(req.getFrom(), new Packet(request));
|
||||||
|
|
||||||
resolver.resolveAsync(requestResponse).whenCompleteAsync((urr, t) -> {
|
resolver.resolveAsync(requestResponse, executor).whenCompleteAsync((urr, t) -> {
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
t.printStackTrace();
|
t.printStackTrace();
|
||||||
return;
|
return;
|
||||||
|
Loading…
Reference in New Issue
Block a user