From 91aba2d427f4957f3bfa7f38b5286abd520a0550 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Thu, 22 Aug 2013 10:47:51 -0400 Subject: [PATCH] Add a number of constructors to RedisThread implementations so classes that use them can configure number of threads and such seperatly --- .../jbgjob/AbstractScheduler.java | 10 +++++- .../jbgjob/ScheduledItemExecutor.java | 5 +++ .../jbgjob/pool/ObjectPool.java | 2 +- .../processor/RedisErrorQueueThread.java | 36 +++++++++---------- .../processor/RedisProcessingQueueThread.java | 32 ++++++++--------- .../jbgjob/processor/RedisThread.java | 19 ++++++---- .../moparisthebest/jbgjob/RedisScheduler.java | 16 +++++---- 7 files changed, 71 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/com/moparisthebest/jbgjob/AbstractScheduler.java b/core/src/main/java/com/moparisthebest/jbgjob/AbstractScheduler.java index e41b881..9e578c2 100644 --- a/core/src/main/java/com/moparisthebest/jbgjob/AbstractScheduler.java +++ b/core/src/main/java/com/moparisthebest/jbgjob/AbstractScheduler.java @@ -22,7 +22,15 @@ package com.moparisthebest.jbgjob; public abstract class AbstractScheduler implements Scheduler { - public static final String defaultQueue = System.getProperty("scheduler.default.queue", "default"); + public static final String defaultQueue = defaultIfEmpty(System.getProperty("scheduler.default.queue"), "default"); + + public static boolean isEmpty(final String str){ + return str == null || str.trim().isEmpty(); + } + + public static String defaultIfEmpty(final String str, final String def){ + return isEmpty(str) ? def : str; + } @Override public boolean schedule(final Class> bgClass, final T dto) { diff --git a/core/src/main/java/com/moparisthebest/jbgjob/ScheduledItemExecutor.java b/core/src/main/java/com/moparisthebest/jbgjob/ScheduledItemExecutor.java index 4a7008f..7d7ff77 100644 --- a/core/src/main/java/com/moparisthebest/jbgjob/ScheduledItemExecutor.java +++ b/core/src/main/java/com/moparisthebest/jbgjob/ScheduledItemExecutor.java @@ -49,6 +49,11 @@ public class ScheduledItemExecutor implements Closeable { public ScheduledItemExecutor() { this(true); } + + public ScheduledItemExecutor(final int numThreads) { + this(numThreads, true); + } + public ScheduledItemExecutor(final boolean blockAddWhenSaturated) { this(defaultNumThreads, blockAddWhenSaturated); } diff --git a/core/src/main/java/com/moparisthebest/jbgjob/pool/ObjectPool.java b/core/src/main/java/com/moparisthebest/jbgjob/pool/ObjectPool.java index a9f91cc..e398b6a 100644 --- a/core/src/main/java/com/moparisthebest/jbgjob/pool/ObjectPool.java +++ b/core/src/main/java/com/moparisthebest/jbgjob/pool/ObjectPool.java @@ -62,7 +62,7 @@ public class ObjectPool { throw new NullPointerException("Released resource cannot be null!"); synchronized (pool) { pool.get(resource.getClass()).push(resource); - System.out.println("pool: " + pool); + //System.out.println("pool: " + pool); } } diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java index 8fef15f..e1a089c 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisErrorQueueThread.java @@ -20,6 +20,7 @@ package com.moparisthebest.jbgjob.processor; +import com.moparisthebest.jbgjob.ScheduledItemExecutor; import com.moparisthebest.jbgjob.result.ExecutionResult; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -31,38 +32,37 @@ import redis.clients.jedis.JedisPool; */ public class RedisErrorQueueThread extends RedisProcessingQueueThread { - public static final String defaultErrorQueueSuffix; - - static { - final String suffix = System.getProperty("redis.errorQueueSuffix"); - defaultErrorQueueSuffix = (suffix == null || suffix.isEmpty()) ? "-error" : ("-" + suffix); - } + public static final String defaultErrorQueueSuffix = defaultIfEmpty(System.getProperty("redis.errorQueueSuffix"), "-error"); public final String errorQueue; public RedisErrorQueueThread() { - this(defaultErrorQueueSuffix); + this(null, null, null, null, null, null); } - public RedisErrorQueueThread(String errorQueueSuffix) { - this(defaultProcessingQueueSuffix, errorQueueSuffix); + public RedisErrorQueueThread(String queue) { + this(queue, null, null, null, null, null); } - public RedisErrorQueueThread(String processingQueueSuffix, String errorQueueSuffix) { - this(defaultQueue, processingQueueSuffix, errorQueueSuffix); + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor) { + this(queue, executor, null, null, null, null); } - public RedisErrorQueueThread(String queue, String processingQueueSuffix, String errorQueueSuffix) { - this(null, queue, processingQueueSuffix, errorQueueSuffix); + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix) { + this(queue, executor, errorQueueSuffix, null, null, null); } - public RedisErrorQueueThread(JedisPool pool, String queue, String processingQueueSuffix, String errorQueueSuffix) { - this(defaultQueuePrefix, pool, queue, processingQueueSuffix, errorQueueSuffix); + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix) { + this(queue, executor, errorQueueSuffix, processingQueueSuffix, null, null); } - public RedisErrorQueueThread(String queuePrefix, JedisPool pool, String queue, String processingQueueSuffix, String errorQueueSuffix) { - super(queuePrefix, pool, queue, processingQueueSuffix); - this.errorQueue = this.queue + errorQueueSuffix; + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix) { + this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, null); + } + + public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool) { + super(queue, executor, processingQueueSuffix, queuePrefix, pool); + this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix); } protected ExecutionResult getExecutionResult(final String scheduledItemString) { diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java index 7079583..5d8909f 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisProcessingQueueThread.java @@ -20,6 +20,7 @@ package com.moparisthebest.jbgjob.processor; +import com.moparisthebest.jbgjob.ScheduledItemExecutor; import com.moparisthebest.jbgjob.result.ExecutionResult; import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult; import redis.clients.jedis.Jedis; @@ -30,34 +31,33 @@ import redis.clients.jedis.JedisPool; */ public class RedisProcessingQueueThread extends RedisThread { - public static final String defaultProcessingQueueSuffix; - - static { - final String suffix = System.getProperty("redis.processingQueueSuffix"); - defaultProcessingQueueSuffix = (suffix == null || suffix.isEmpty()) ? "-processing" : ("-" + suffix); - } + public static final String defaultProcessingQueueSuffix = defaultIfEmpty(System.getProperty("redis.processingQueueSuffix"), "-processing"); public final String processingQueue; public RedisProcessingQueueThread() { - this(defaultProcessingQueueSuffix); + this(null, null, null, null, null); } - public RedisProcessingQueueThread(String processingQueueSuffix) { - this(defaultQueue, processingQueueSuffix); + public RedisProcessingQueueThread(String queue) { + this(queue, null, null, null, null); } - public RedisProcessingQueueThread(String queue, String processingQueueSuffix) { - this(null, queue, processingQueueSuffix); + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor) { + this(queue, executor, null, null, null); } - public RedisProcessingQueueThread(JedisPool pool, String queue, String processingQueueSuffix) { - this(defaultQueuePrefix, pool, queue, processingQueueSuffix); + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix) { + this(queue, executor, processingQueueSuffix, null, null); } - public RedisProcessingQueueThread(String queuePrefix, JedisPool pool, String queue, String processingQueueSuffix) { - super(queuePrefix, pool, queue); - this.processingQueue = this.queue + processingQueueSuffix; + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix) { + this(queue, executor, processingQueueSuffix, queuePrefix, null); + } + + public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool) { + super(queue, executor, queuePrefix, pool); + this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix); } @Override diff --git a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java index 41703c4..4235fd3 100644 --- a/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java +++ b/redisprocessor/src/main/java/com/moparisthebest/jbgjob/processor/RedisThread.java @@ -58,24 +58,29 @@ public class RedisThread extends RedisScheduler implements Runnable { protected final String queue; protected final String shutdownKey; - protected final ScheduledItemExecutor executor = new ScheduledItemExecutor(); + protected final ScheduledItemExecutor executor; public RedisThread() { - this(AbstractScheduler.defaultQueue); + this(null, null, null, null); } public RedisThread(String queue) { - this(null, queue); + this(queue, null, null, null); } - public RedisThread(JedisPool pool, String queue) { - this(defaultQueuePrefix, pool, queue); + public RedisThread(String queue, ScheduledItemExecutor executor) { + this(queue, executor, null, null); } - public RedisThread(String queuePrefix, JedisPool pool, String queue) { + public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix) { + this(queue, executor, queuePrefix, null); + } + + public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool) { super(queuePrefix, pool); - this.queue = this.queuePrefix + queue; + this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue); this.shutdownKey = this.queuePrefix + "shutdown"; + this.executor = executor != null ? executor : new ScheduledItemExecutor(); } protected String pollRedis(final Jedis jedis, final int timeout) { diff --git a/redisscheduler/src/main/java/com/moparisthebest/jbgjob/RedisScheduler.java b/redisscheduler/src/main/java/com/moparisthebest/jbgjob/RedisScheduler.java index fee2fba..7ce966a 100644 --- a/redisscheduler/src/main/java/com/moparisthebest/jbgjob/RedisScheduler.java +++ b/redisscheduler/src/main/java/com/moparisthebest/jbgjob/RedisScheduler.java @@ -36,12 +36,12 @@ public class RedisScheduler extends AbstractScheduler { static { String prefix = System.getProperty("redis.queuePrefix"); - if (prefix == null) + if (isEmpty(prefix)) try { - prefix = java.net.InetAddress.getLocalHost().getHostName(); + prefix = java.net.InetAddress.getLocalHost().getHostName() + "-"; } catch (Throwable e) { } - defaultQueuePrefix = (prefix == null || prefix.isEmpty()) ? "" : (prefix + "-"); + defaultQueuePrefix = defaultIfEmpty(prefix, ""); } protected final String queuePrefix; @@ -50,15 +50,19 @@ public class RedisScheduler extends AbstractScheduler { protected final JedisPool pool; public RedisScheduler() { - this(null); + this(null, null); } public RedisScheduler(JedisPool pool) { - this(defaultQueuePrefix, pool); + this(null, pool); + } + + public RedisScheduler(String queuePrefix) { + this(queuePrefix, null); } public RedisScheduler(String queuePrefix, JedisPool pool) { - this.queuePrefix = queuePrefix; + this.queuePrefix = defaultIfEmpty(queuePrefix, defaultQueuePrefix); this.pool = pool != null ? pool : new JedisPool(new JedisPoolConfig(), System.getProperty("redis.host", "localhost")); }