Re-factor Resolver to provide async/inline default impls so only 1 needs implemented, implement simple DirectProxy mode that takes a listener and resolver only

This commit is contained in:
Travis Burtrum 2019-03-14 01:10:29 -04:00
parent 9c925d2046
commit 841ea530f4
4 changed files with 23 additions and 13 deletions

View File

@ -4,6 +4,7 @@ import com.moparisthebest.dns.listen.Listener;
import com.moparisthebest.dns.net.ParsedUrl; import com.moparisthebest.dns.net.ParsedUrl;
import com.moparisthebest.dns.resolve.CacheResolver; import com.moparisthebest.dns.resolve.CacheResolver;
import com.moparisthebest.dns.resolve.QueueProcessingResolver; import com.moparisthebest.dns.resolve.QueueProcessingResolver;
import com.moparisthebest.dns.resolve.Resolver;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -17,6 +18,12 @@ public class DnsProxy {
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
if (args.length == 2) {
// quick hack to run direct proxy todo: make this shut down clean
Listener.of(ParsedUrl.of(args[0]), Resolver.of(ParsedUrl.of(args[1])), ForkJoinPool.commonPool()).run();
return;
}
final Map<String, String> config; final Map<String, String> config;
final File propsFile = new File(args.length > 0 ? args[0] : "jdnsproxy.properties"); final File propsFile = new File(args.length > 0 ? args[0] : "jdnsproxy.properties");
if(propsFile.canRead()) { if(propsFile.canRead()) {
@ -62,7 +69,7 @@ public class DnsProxy {
.startQueueProcessingResolvers(queueProcessingResolvers); .startQueueProcessingResolvers(queueProcessingResolvers);
final List<Listener> listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+")) final List<Listener> listeners = Arrays.stream(config.getOrDefault("listeners", "tcp://127.0.0.1:5353 udp://127.0.0.1:5353").split("\\s+"))
.map(url -> Listener.ofAndStart(url, resolver, executor)).collect(Collectors.toList()); .map(url -> Listener.ofAndStart(ParsedUrl.of(url), resolver, executor)).collect(Collectors.toList());
//final List<Listener> listeners = new ArrayList<>(); //final List<Listener> listeners = new ArrayList<>();
//listeners.add(Listener.ofAndStart("tcp://127.0.0.1:5556", resolver, executor)); //listeners.add(Listener.ofAndStart("tcp://127.0.0.1:5556", resolver, executor));
//listeners.add(Listener.ofAndStart("udp://127.0.0.1:5556", resolver, executor)); //listeners.add(Listener.ofAndStart("udp://127.0.0.1:5556", resolver, executor));

View File

@ -13,18 +13,17 @@ public interface Listener extends Runnable, AutoCloseable {
ServiceLoader<Services> services = ServiceLoader.load(Services.class); ServiceLoader<Services> services = ServiceLoader.load(Services.class);
static Listener of(final String listener, final Resolver resolver, final ExecutorService executor) { static Listener of(final ParsedUrl parsedUrl, final Resolver resolver, final ExecutorService executor) {
final ParsedUrl parsedUrl = ParsedUrl.of(listener);
for (final Services s : services) { for (final Services s : services) {
final Listener ret = s.getListener(parsedUrl, resolver, executor); final Listener ret = s.getListener(parsedUrl, resolver, executor);
if (ret != null) if (ret != null)
return ret; return ret;
} }
throw new IllegalArgumentException("unhandled listener format: " + listener); throw new IllegalArgumentException("unhandled listener format: " + parsedUrl.getUrlStr());
} }
public static Listener ofAndStart(final String listener, final Resolver resolver, final ExecutorService executor) { public static Listener ofAndStart(final ParsedUrl parsedUrl, final Resolver resolver, final ExecutorService executor) {
final Listener ret = of(listener, resolver, executor); final Listener ret = of(parsedUrl, resolver, executor);
executor.execute(ret); executor.execute(ret);
return ret; return ret;
} }

View File

@ -164,11 +164,6 @@ public class CacheResolver implements Resolver, AutoCloseable {
return request; return request;
} }
@Override
public Packet resolve(final Packet request) throws Exception {
return resolveAsync(new BaseRequestResponse(request)).get().getResponse();
}
public void persistCache(final File file, final Map<String, CachedPacket> cache) throws IOException { public void persistCache(final File file, final Map<String, CachedPacket> cache) throws IOException {
final File tmpFile = new File(file.getAbsolutePath() + ".tmp"); final File tmpFile = new File(file.getAbsolutePath() + ".tmp");
try(FileOutputStream fos = new FileOutputStream(tmpFile); try(FileOutputStream fos = new FileOutputStream(tmpFile);

View File

@ -8,7 +8,14 @@ import java.util.concurrent.CompletableFuture;
public interface Resolver { public interface Resolver {
default <E extends RequestResponse> CompletableFuture<E> resolveAsync(E requestResponse) { default <E extends RequestResponse> CompletableFuture<E> resolveAsync(E requestResponse) {
return null; try {
requestResponse.setResponse(resolve(requestResponse.getRequest()));
return CompletableFuture.completedFuture(requestResponse);
} catch (Exception e) {
final CompletableFuture<E> ret = new CompletableFuture<>();
ret.completeExceptionally(e);
return ret;
}
/* /*
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
requestResponse.setResponse(resolve(requestResponse.getRequest())); requestResponse.setResponse(resolve(requestResponse.getRequest()));
@ -17,7 +24,9 @@ public interface Resolver {
*/ */
} }
Packet resolve(Packet request) throws Exception; default Packet resolve(Packet request) throws Exception {
return resolveAsync(new BaseRequestResponse(request)).get().getResponse();
}
ServiceLoader<Services> services = ServiceLoader.load(Services.class); ServiceLoader<Services> services = ServiceLoader.load(Services.class);