Add a number of constructors to RedisThread implementations so classes that use them can configure number of threads and such seperatly

This commit is contained in:
moparisthebest 2013-08-22 10:47:51 -04:00
parent ecf55b799c
commit 91aba2d427
7 changed files with 71 additions and 49 deletions

View File

@ -22,7 +22,15 @@ package com.moparisthebest.jbgjob;
public abstract class AbstractScheduler implements Scheduler { public abstract class AbstractScheduler implements Scheduler {
public static final String defaultQueue = System.getProperty("scheduler.default.queue", "default"); public static final String defaultQueue = defaultIfEmpty(System.getProperty("scheduler.default.queue"), "default");
public static boolean isEmpty(final String str){
return str == null || str.trim().isEmpty();
}
public static String defaultIfEmpty(final String str, final String def){
return isEmpty(str) ? def : str;
}
@Override @Override
public <T> boolean schedule(final Class<? extends BackgroundJob<T>> bgClass, final T dto) { public <T> boolean schedule(final Class<? extends BackgroundJob<T>> bgClass, final T dto) {

View File

@ -49,6 +49,11 @@ public class ScheduledItemExecutor implements Closeable {
public ScheduledItemExecutor() { public ScheduledItemExecutor() {
this(true); this(true);
} }
public ScheduledItemExecutor(final int numThreads) {
this(numThreads, true);
}
public ScheduledItemExecutor(final boolean blockAddWhenSaturated) { public ScheduledItemExecutor(final boolean blockAddWhenSaturated) {
this(defaultNumThreads, blockAddWhenSaturated); this(defaultNumThreads, blockAddWhenSaturated);
} }

View File

@ -62,7 +62,7 @@ public class ObjectPool {
throw new NullPointerException("Released resource cannot be null!"); throw new NullPointerException("Released resource cannot be null!");
synchronized (pool) { synchronized (pool) {
pool.get(resource.getClass()).push(resource); pool.get(resource.getClass()).push(resource);
System.out.println("pool: " + pool); //System.out.println("pool: " + pool);
} }
} }

View File

@ -20,6 +20,7 @@
package com.moparisthebest.jbgjob.processor; package com.moparisthebest.jbgjob.processor;
import com.moparisthebest.jbgjob.ScheduledItemExecutor;
import com.moparisthebest.jbgjob.result.ExecutionResult; import com.moparisthebest.jbgjob.result.ExecutionResult;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
@ -31,38 +32,37 @@ import redis.clients.jedis.JedisPool;
*/ */
public class RedisErrorQueueThread extends RedisProcessingQueueThread { public class RedisErrorQueueThread extends RedisProcessingQueueThread {
public static final String defaultErrorQueueSuffix; public static final String defaultErrorQueueSuffix = defaultIfEmpty(System.getProperty("redis.errorQueueSuffix"), "-error");
static {
final String suffix = System.getProperty("redis.errorQueueSuffix");
defaultErrorQueueSuffix = (suffix == null || suffix.isEmpty()) ? "-error" : ("-" + suffix);
}
public final String errorQueue; public final String errorQueue;
public RedisErrorQueueThread() { public RedisErrorQueueThread() {
this(defaultErrorQueueSuffix); this(null, null, null, null, null, null);
} }
public RedisErrorQueueThread(String errorQueueSuffix) { public RedisErrorQueueThread(String queue) {
this(defaultProcessingQueueSuffix, errorQueueSuffix); this(queue, null, null, null, null, null);
} }
public RedisErrorQueueThread(String processingQueueSuffix, String errorQueueSuffix) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor) {
this(defaultQueue, processingQueueSuffix, errorQueueSuffix); this(queue, executor, null, null, null, null);
} }
public RedisErrorQueueThread(String queue, String processingQueueSuffix, String errorQueueSuffix) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix) {
this(null, queue, processingQueueSuffix, errorQueueSuffix); this(queue, executor, errorQueueSuffix, null, null, null);
} }
public RedisErrorQueueThread(JedisPool pool, String queue, String processingQueueSuffix, String errorQueueSuffix) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix) {
this(defaultQueuePrefix, pool, queue, processingQueueSuffix, errorQueueSuffix); this(queue, executor, errorQueueSuffix, processingQueueSuffix, null, null);
} }
public RedisErrorQueueThread(String queuePrefix, JedisPool pool, String queue, String processingQueueSuffix, String errorQueueSuffix) { public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix) {
super(queuePrefix, pool, queue, processingQueueSuffix); this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, null);
this.errorQueue = this.queue + errorQueueSuffix; }
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool);
this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix);
} }
protected ExecutionResult getExecutionResult(final String scheduledItemString) { protected ExecutionResult getExecutionResult(final String scheduledItemString) {

View File

@ -20,6 +20,7 @@
package com.moparisthebest.jbgjob.processor; package com.moparisthebest.jbgjob.processor;
import com.moparisthebest.jbgjob.ScheduledItemExecutor;
import com.moparisthebest.jbgjob.result.ExecutionResult; import com.moparisthebest.jbgjob.result.ExecutionResult;
import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult; import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
@ -30,34 +31,33 @@ import redis.clients.jedis.JedisPool;
*/ */
public class RedisProcessingQueueThread extends RedisThread { public class RedisProcessingQueueThread extends RedisThread {
public static final String defaultProcessingQueueSuffix; public static final String defaultProcessingQueueSuffix = defaultIfEmpty(System.getProperty("redis.processingQueueSuffix"), "-processing");
static {
final String suffix = System.getProperty("redis.processingQueueSuffix");
defaultProcessingQueueSuffix = (suffix == null || suffix.isEmpty()) ? "-processing" : ("-" + suffix);
}
public final String processingQueue; public final String processingQueue;
public RedisProcessingQueueThread() { public RedisProcessingQueueThread() {
this(defaultProcessingQueueSuffix); this(null, null, null, null, null);
} }
public RedisProcessingQueueThread(String processingQueueSuffix) { public RedisProcessingQueueThread(String queue) {
this(defaultQueue, processingQueueSuffix); this(queue, null, null, null, null);
} }
public RedisProcessingQueueThread(String queue, String processingQueueSuffix) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor) {
this(null, queue, processingQueueSuffix); this(queue, executor, null, null, null);
} }
public RedisProcessingQueueThread(JedisPool pool, String queue, String processingQueueSuffix) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix) {
this(defaultQueuePrefix, pool, queue, processingQueueSuffix); this(queue, executor, processingQueueSuffix, null, null);
} }
public RedisProcessingQueueThread(String queuePrefix, JedisPool pool, String queue, String processingQueueSuffix) { public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix) {
super(queuePrefix, pool, queue); this(queue, executor, processingQueueSuffix, queuePrefix, null);
this.processingQueue = this.queue + processingQueueSuffix; }
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool) {
super(queue, executor, queuePrefix, pool);
this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix);
} }
@Override @Override

View File

@ -58,24 +58,29 @@ public class RedisThread extends RedisScheduler implements Runnable {
protected final String queue; protected final String queue;
protected final String shutdownKey; protected final String shutdownKey;
protected final ScheduledItemExecutor executor = new ScheduledItemExecutor(); protected final ScheduledItemExecutor executor;
public RedisThread() { public RedisThread() {
this(AbstractScheduler.defaultQueue); this(null, null, null, null);
} }
public RedisThread(String queue) { public RedisThread(String queue) {
this(null, queue); this(queue, null, null, null);
} }
public RedisThread(JedisPool pool, String queue) { public RedisThread(String queue, ScheduledItemExecutor executor) {
this(defaultQueuePrefix, pool, queue); this(queue, executor, null, null);
} }
public RedisThread(String queuePrefix, JedisPool pool, String queue) { public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix) {
this(queue, executor, queuePrefix, null);
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool) {
super(queuePrefix, pool); super(queuePrefix, pool);
this.queue = this.queuePrefix + queue; this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue);
this.shutdownKey = this.queuePrefix + "shutdown"; this.shutdownKey = this.queuePrefix + "shutdown";
this.executor = executor != null ? executor : new ScheduledItemExecutor();
} }
protected String pollRedis(final Jedis jedis, final int timeout) { protected String pollRedis(final Jedis jedis, final int timeout) {

View File

@ -36,12 +36,12 @@ public class RedisScheduler extends AbstractScheduler {
static { static {
String prefix = System.getProperty("redis.queuePrefix"); String prefix = System.getProperty("redis.queuePrefix");
if (prefix == null) if (isEmpty(prefix))
try { try {
prefix = java.net.InetAddress.getLocalHost().getHostName(); prefix = java.net.InetAddress.getLocalHost().getHostName() + "-";
} catch (Throwable e) { } catch (Throwable e) {
} }
defaultQueuePrefix = (prefix == null || prefix.isEmpty()) ? "" : (prefix + "-"); defaultQueuePrefix = defaultIfEmpty(prefix, "");
} }
protected final String queuePrefix; protected final String queuePrefix;
@ -50,15 +50,19 @@ public class RedisScheduler extends AbstractScheduler {
protected final JedisPool pool; protected final JedisPool pool;
public RedisScheduler() { public RedisScheduler() {
this(null); this(null, null);
} }
public RedisScheduler(JedisPool pool) { public RedisScheduler(JedisPool pool) {
this(defaultQueuePrefix, pool); this(null, pool);
}
public RedisScheduler(String queuePrefix) {
this(queuePrefix, null);
} }
public RedisScheduler(String queuePrefix, JedisPool pool) { public RedisScheduler(String queuePrefix, JedisPool pool) {
this.queuePrefix = queuePrefix; this.queuePrefix = defaultIfEmpty(queuePrefix, defaultQueuePrefix);
this.pool = pool != null ? pool : new JedisPool(new JedisPoolConfig(), System.getProperty("redis.host", "localhost")); this.pool = pool != null ? pool : new JedisPool(new JedisPoolConfig(), System.getProperty("redis.host", "localhost"));
} }