From e7ec45cd2c3c07d77597d6c7e97179b66a007fa0 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Thu, 9 Aug 2018 16:13:20 -0400 Subject: [PATCH] Implement noWaitQueues in redisprocessor, giving the option to instantly check other queues after blocking on the main one --- .../processor/RedisErrorQueueThread.java | 6 +++- .../processor/RedisProcessingQueueThread.java | 14 ++++++-- .../jbgjob/processor/RedisThread.java | 35 ++++++++++++++++++- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java index c397ce8..579a04d 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java @@ -86,7 +86,11 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread { } public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) { - super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop); + this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, pool, stop, null); + } + + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable noWaitQueues) { + super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, noWaitQueues); this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix); } diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java index acd1ef4..1498c7d 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java @@ -77,12 +77,22 @@ public class RedisProcessingQueueThread extends RedisThread { } public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) { - super(queue, executor, queuePrefix, pool, stop); + this(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, null); + } + + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable noWaitQueues) { + super(queue, executor, queuePrefix, pool, stop, noWaitQueues); this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix); } @Override - protected String pollRedis(final Jedis jedis, final int timeout) { + protected String pollRedisNoWait(final Jedis jedis, final String queueName) { + if (debug) System.out.printf("redis> RPOPLPUSH %s %s\n", queueName, processingQueue); + return jedis.rpoplpush(queueName, processingQueue); + } + + @Override + protected String pollRedisBlock(final Jedis jedis, final int timeout) { if (debug) System.out.printf("redis> BRPOPLPUSH %s %s %d\n", queue, processingQueue, timeout); return jedis.brpoplpush(queue, processingQueue, timeout); } diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java index c8842ef..b1a18b0 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java @@ -29,6 +29,7 @@ import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -59,6 +60,7 @@ public class RedisThread extends RedisScheduler implements Runnable { protected final String queue; protected final Stop stop; protected final ScheduledItemExecutor executor; + protected final Iterable noWaitQueues; public RedisThread() { this(null, null, (String)null, null); @@ -101,10 +103,24 @@ public class RedisThread extends RedisScheduler implements Runnable { } public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop) { + this(queue, executor, queuePrefix, pool, stop, null); + } + + public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop, Iterable noWaitQueues) { super(queuePrefix, pool); this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue); this.executor = executor != null ? executor : new ScheduledItemExecutor(); + List nwq = null; + if(noWaitQueues != null) { + nwq = new ArrayList(); + for(final String q : noWaitQueues) + nwq.add(this.queuePrefix + q); + if(nwq.isEmpty()) + nwq = null; + } + this.noWaitQueues = nwq; + if(stop == null){ final String shutdownKey = this.queuePrefix + "shutdown"; stop = new Stop(){ @@ -116,13 +132,30 @@ public class RedisThread extends RedisScheduler implements Runnable { this.stop = stop; } - protected String pollRedis(final Jedis jedis, final int timeout) { + protected String pollRedisNoWait(final Jedis jedis, final String queueName) { + if (debug) System.out.printf("redis> RPOP %s\n", queueName); + return jedis.rpop(queueName); + } + + protected String pollRedisBlock(final Jedis jedis, final int timeout) { if (debug) System.out.printf("redis> BRPOP %s %d\n", queue, timeout); final List items = jedis.brpop(timeout, queue); //System.out.println("items: " + items); return (items == null || items.size() < 2) ? null : items.get(1); } + protected String pollRedis(final Jedis jedis, final int timeout) { + String ret = pollRedisBlock(jedis, timeout); + if(ret != null || noWaitQueues == null) + return ret; + for(final String queueName : noWaitQueues) { + ret = pollRedisNoWait(jedis, queueName); + if(ret != null) + return ret; + } + return null; + } + private static final ExecutionResult noop = new PrintStackTraceExecutionResult(); protected ExecutionResult getExecutionResult(final String scheduledItemString) {