Implement noWaitQueues in redisprocessor, giving the option to instantly check other queues after blocking on the main one

This commit is contained in:
Travis Burtrum 2018-08-09 16:13:20 -04:00
parent 798cc6e7d8
commit e7ec45cd2c
3 changed files with 51 additions and 4 deletions

View File

@ -86,7 +86,11 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread {
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop);
this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, pool, stop, null);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, noWaitQueues);
this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix);
}

View File

@ -77,12 +77,22 @@ public class RedisProcessingQueueThread extends RedisThread {
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, queuePrefix, pool, stop);
this(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, null);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queue, executor, queuePrefix, pool, stop, noWaitQueues);
this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix);
}
@Override
protected String pollRedis(final Jedis jedis, final int timeout) {
protected String pollRedisNoWait(final Jedis jedis, final String queueName) {
if (debug) System.out.printf("redis> RPOPLPUSH %s %s\n", queueName, processingQueue);
return jedis.rpoplpush(queueName, processingQueue);
}
@Override
protected String pollRedisBlock(final Jedis jedis, final int timeout) {
if (debug) System.out.printf("redis> BRPOPLPUSH %s %s %d\n", queue, processingQueue, timeout);
return jedis.brpoplpush(queue, processingQueue, timeout);
}

View File

@ -29,6 +29,7 @@ import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -59,6 +60,7 @@ public class RedisThread extends RedisScheduler implements Runnable {
protected final String queue;
protected final Stop stop;
protected final ScheduledItemExecutor executor;
protected final Iterable<String> noWaitQueues;
public RedisThread() {
this(null, null, (String)null, null);
@ -101,10 +103,24 @@ public class RedisThread extends RedisScheduler implements Runnable {
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop) {
this(queue, executor, queuePrefix, pool, stop, null);
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queuePrefix, pool);
this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue);
this.executor = executor != null ? executor : new ScheduledItemExecutor();
List<String> nwq = null;
if(noWaitQueues != null) {
nwq = new ArrayList<String>();
for(final String q : noWaitQueues)
nwq.add(this.queuePrefix + q);
if(nwq.isEmpty())
nwq = null;
}
this.noWaitQueues = nwq;
if(stop == null){
final String shutdownKey = this.queuePrefix + "shutdown";
stop = new Stop(){
@ -116,13 +132,30 @@ public class RedisThread extends RedisScheduler implements Runnable {
this.stop = stop;
}
protected String pollRedis(final Jedis jedis, final int timeout) {
protected String pollRedisNoWait(final Jedis jedis, final String queueName) {
if (debug) System.out.printf("redis> RPOP %s\n", queueName);
return jedis.rpop(queueName);
}
protected String pollRedisBlock(final Jedis jedis, final int timeout) {
if (debug) System.out.printf("redis> BRPOP %s %d\n", queue, timeout);
final List<String> items = jedis.brpop(timeout, queue);
//System.out.println("items: " + items);
return (items == null || items.size() < 2) ? null : items.get(1);
}
protected String pollRedis(final Jedis jedis, final int timeout) {
String ret = pollRedisBlock(jedis, timeout);
if(ret != null || noWaitQueues == null)
return ret;
for(final String queueName : noWaitQueues) {
ret = pollRedisNoWait(jedis, queueName);
if(ret != null)
return ret;
}
return null;
}
private static final ExecutionResult noop = new PrintStackTraceExecutionResult();
protected ExecutionResult getExecutionResult(final String scheduledItemString) {