From 0af73168737771e567eabda0b5bfdd1f13798555 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Thu, 21 Aug 2014 09:58:59 -0400 Subject: [PATCH] Change shutdownKey in RedisThread to a Stop interface that applications can implement differently, behavior doesn't change and by default goes off the same shutdownKey as before --- .../processor/RedisErrorQueueThread.java | 26 +++++++++- .../processor/RedisProcessingQueueThread.java | 23 ++++++++- .../jbgjob/processor/RedisThread.java | 50 ++++++++++++++++--- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java index e1a089c..9e069fa 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java @@ -40,14 +40,34 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread { this(null, null, null, null, null, null); } + public RedisErrorQueueThread(Stop stop) { + this(null, null, null, null, null, null, stop); + } + public RedisErrorQueueThread(String queue) { this(queue, null, null, null, null, null); } + public RedisErrorQueueThread(String queue, Stop stop) { + this(queue, null, null, null, null, null, stop); + } + + public RedisErrorQueueThread(JedisPool pool, Stop stop) { + this(null, null, null, null, null, pool, stop); + } + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor) { this(queue, executor, null, null, null, null); } + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, Stop stop) { + this(queue, executor, null, null, null, null, stop); + } + + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) { + this(queue, executor, null, null, null, pool, stop); + } + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix) { this(queue, executor, errorQueueSuffix, null, null, null); } @@ -61,7 +81,11 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread { } public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool) { - super(queue, executor, processingQueueSuffix, queuePrefix, pool); + this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, pool, null); + } + + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) { + super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop); this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix); } diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java index 5d8909f..acd1ef4 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java @@ -38,15 +38,32 @@ public class RedisProcessingQueueThread extends RedisThread { public RedisProcessingQueueThread() { this(null, null, null, null, null); } + public RedisProcessingQueueThread(Stop stop) { + this(null, null, null, null, null, stop); + } public RedisProcessingQueueThread(String queue) { this(queue, null, null, null, null); } + public RedisProcessingQueueThread(String queue, Stop stop) { + this(queue, null, null, null, null, stop); + } + + public RedisProcessingQueueThread(JedisPool pool, Stop stop) { + this(null, null, null, null, pool, stop); + } public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor) { this(queue, executor, null, null, null); } + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, Stop stop) { + this(queue, executor, null, null, null, stop); + } + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) { + this(queue, executor, null, null, pool, stop); + } + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix) { this(queue, executor, processingQueueSuffix, null, null); } @@ -56,7 +73,11 @@ public class RedisProcessingQueueThread extends RedisThread { } public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool) { - super(queue, executor, queuePrefix, pool); + this(queue, executor, processingQueueSuffix, queuePrefix, pool, null); + } + + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) { + super(queue, executor, queuePrefix, pool, stop); this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix); } diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java index 4235fd3..c8842ef 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java @@ -57,19 +57,39 @@ public class RedisThread extends RedisScheduler implements Runnable { private int timeoutCounter = 0; protected final String queue; - protected final String shutdownKey; + protected final Stop stop; protected final ScheduledItemExecutor executor; public RedisThread() { - this(null, null, null, null); + this(null, null, (String)null, null); + } + + public RedisThread(Stop stop) { + this(null, null, null, null, stop); } public RedisThread(String queue) { - this(queue, null, null, null); + this(queue, null, (String)null, null); + } + + public RedisThread(String queue, Stop stop) { + this(queue, null, null, null, stop); + } + + public RedisThread(JedisPool pool, Stop stop) { + this(null, null, null, pool, stop); } public RedisThread(String queue, ScheduledItemExecutor executor) { - this(queue, executor, null, null); + this(queue, executor, (String)null, null); + } + + public RedisThread(String queue, ScheduledItemExecutor executor, Stop stop) { + this(queue, executor, null, null, stop); + } + + public RedisThread(String queue, ScheduledItemExecutor executor, JedisPool pool, Stop stop) { + this(queue, executor, null, pool, stop); } public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix) { @@ -77,10 +97,23 @@ public class RedisThread extends RedisScheduler implements Runnable { } public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool) { + this(queue, executor, queuePrefix, pool, null); + } + + public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop) { super(queuePrefix, pool); this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue); - this.shutdownKey = this.queuePrefix + "shutdown"; this.executor = executor != null ? executor : new ScheduledItemExecutor(); + + if(stop == null){ + final String shutdownKey = this.queuePrefix + "shutdown"; + stop = new Stop(){ + public boolean stop(final Jedis jedis){ + return "shutdown".equals(jedis.get(shutdownKey)); + } + }; + } + this.stop = stop; } protected String pollRedis(final Jedis jedis, final int timeout) { @@ -96,7 +129,6 @@ public class RedisThread extends RedisScheduler implements Runnable { return noop; } - @Override public final void run() { Jedis jedis = null; outer: @@ -106,7 +138,7 @@ public class RedisThread extends RedisScheduler implements Runnable { while (true) { if (debug && maxTimeoutsBeforeClose > 0) System.out.printf("maxTimeoutsBeforeClose: %d timeoutCounter: %d\n", maxTimeoutsBeforeClose, timeoutCounter); // check to see if we should shutdown - if ("shutdown".equals(jedis.get(this.shutdownKey))) + if (this.stop.stop(jedis)) break outer; // grab an item, if it's null (probably timed out) try again final String scheduledItemString = pollRedis(jedis, defaultTimeout); @@ -175,4 +207,8 @@ public class RedisThread extends RedisScheduler implements Runnable { // set all needed arguments with system properties new RedisThread().run(); } + + public interface Stop { + public boolean stop(final Jedis jedis); + } }