Change shutdownKey in RedisThread to a Stop interface that applications can implement differently, behavior doesn't change and by default goes off the same shutdownKey as before

This commit is contained in:
moparisthebest 2014-08-21 09:58:59 -04:00
parent d9415ef178
commit 0af7316873
3 changed files with 90 additions and 9 deletions
redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor

View File

@ -40,14 +40,34 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread {
this(null, null, null, null, null, null); this(null, null, null, null, null, null);
} }
public RedisErrorQueueThread(Stop stop) {
this(null, null, null, null, null, null, stop);
}
public RedisErrorQueueThread(String queue) { public RedisErrorQueueThread(String queue) {
this(queue, null, null, null, null, null); this(queue, null, null, null, null, null);
} }
public RedisErrorQueueThread(String queue, Stop stop) {
this(queue, null, null, null, null, null, stop);
}
public RedisErrorQueueThread(JedisPool pool, Stop stop) {
this(null, null, null, null, null, pool, stop);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor) {
this(queue, executor, null, null, null, null); this(queue, executor, null, null, null, null);
} }
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, Stop stop) {
this(queue, executor, null, null, null, null, stop);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) {
this(queue, executor, null, null, null, pool, stop);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix) {
this(queue, executor, errorQueueSuffix, null, null, null); this(queue, executor, errorQueueSuffix, null, null, null);
} }
@ -61,7 +81,11 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread {
} }
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool); this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, pool, null);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop);
this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix); this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix);
} }

View File

@ -38,15 +38,32 @@ public class RedisProcessingQueueThread extends RedisThread {
public RedisProcessingQueueThread() { public RedisProcessingQueueThread() {
this(null, null, null, null, null); this(null, null, null, null, null);
} }
public RedisProcessingQueueThread(Stop stop) {
this(null, null, null, null, null, stop);
}
public RedisProcessingQueueThread(String queue) { public RedisProcessingQueueThread(String queue) {
this(queue, null, null, null, null); this(queue, null, null, null, null);
} }
public RedisProcessingQueueThread(String queue, Stop stop) {
this(queue, null, null, null, null, stop);
}
public RedisProcessingQueueThread(JedisPool pool, Stop stop) {
this(null, null, null, null, pool, stop);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor) {
this(queue, executor, null, null, null); this(queue, executor, null, null, null);
} }
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, Stop stop) {
this(queue, executor, null, null, null, stop);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) {
this(queue, executor, null, null, pool, stop);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix) {
this(queue, executor, processingQueueSuffix, null, null); this(queue, executor, processingQueueSuffix, null, null);
} }
@ -56,7 +73,11 @@ public class RedisProcessingQueueThread extends RedisThread {
} }
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool) {
super(queue, executor, queuePrefix, pool); this(queue, executor, processingQueueSuffix, queuePrefix, pool, null);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, queuePrefix, pool, stop);
this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix); this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix);
} }

View File

@ -57,19 +57,39 @@ public class RedisThread extends RedisScheduler implements Runnable {
private int timeoutCounter = 0; private int timeoutCounter = 0;
protected final String queue; protected final String queue;
protected final String shutdownKey; protected final Stop stop;
protected final ScheduledItemExecutor executor; protected final ScheduledItemExecutor executor;
public RedisThread() { public RedisThread() {
this(null, null, null, null); this(null, null, (String)null, null);
}
public RedisThread(Stop stop) {
this(null, null, null, null, stop);
} }
public RedisThread(String queue) { public RedisThread(String queue) {
this(queue, null, null, null); this(queue, null, (String)null, null);
}
public RedisThread(String queue, Stop stop) {
this(queue, null, null, null, stop);
}
public RedisThread(JedisPool pool, Stop stop) {
this(null, null, null, pool, stop);
} }
public RedisThread(String queue, ScheduledItemExecutor executor) { public RedisThread(String queue, ScheduledItemExecutor executor) {
this(queue, executor, null, null); this(queue, executor, (String)null, null);
}
public RedisThread(String queue, ScheduledItemExecutor executor, Stop stop) {
this(queue, executor, null, null, stop);
}
public RedisThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) {
this(queue, executor, null, pool, stop);
} }
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix) { public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix) {
@ -77,10 +97,23 @@ public class RedisThread extends RedisScheduler implements Runnable {
} }
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool) { public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool) {
this(queue, executor, queuePrefix, pool, null);
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop) {
super(queuePrefix, pool); super(queuePrefix, pool);
this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue); this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue);
this.shutdownKey = this.queuePrefix + "shutdown";
this.executor = executor != null ? executor : new ScheduledItemExecutor(); this.executor = executor != null ? executor : new ScheduledItemExecutor();
if(stop == null){
final String shutdownKey = this.queuePrefix + "shutdown";
stop = new Stop(){
public boolean stop(final Jedis jedis){
return "shutdown".equals(jedis.get(shutdownKey));
}
};
}
this.stop = stop;
} }
protected String pollRedis(final Jedis jedis, final int timeout) { protected String pollRedis(final Jedis jedis, final int timeout) {
@ -96,7 +129,6 @@ public class RedisThread extends RedisScheduler implements Runnable {
return noop; return noop;
} }
@Override
public final void run() { public final void run() {
Jedis jedis = null; Jedis jedis = null;
outer: outer:
@ -106,7 +138,7 @@ public class RedisThread extends RedisScheduler implements Runnable {
while (true) { while (true) {
if (debug && maxTimeoutsBeforeClose > 0) System.out.printf("maxTimeoutsBeforeClose: %d timeoutCounter: %d\n", maxTimeoutsBeforeClose, timeoutCounter); if (debug && maxTimeoutsBeforeClose > 0) System.out.printf("maxTimeoutsBeforeClose: %d timeoutCounter: %d\n", maxTimeoutsBeforeClose, timeoutCounter);
// check to see if we should shutdown // check to see if we should shutdown
if ("shutdown".equals(jedis.get(this.shutdownKey))) if (this.stop.stop(jedis))
break outer; break outer;
// grab an item, if it's null (probably timed out) try again // grab an item, if it's null (probably timed out) try again
final String scheduledItemString = pollRedis(jedis, defaultTimeout); final String scheduledItemString = pollRedis(jedis, defaultTimeout);
@ -175,4 +207,8 @@ public class RedisThread extends RedisScheduler implements Runnable {
// set all needed arguments with system properties // set all needed arguments with system properties
new RedisThread().run(); new RedisThread().run();
} }
public interface Stop {
public boolean stop(final Jedis jedis);
}
} }