Compare commits

...

9 Commits

12 changed files with 212 additions and 51 deletions

42
.ci/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,42 @@
properties(
[
disableConcurrentBuilds()
]
)
node('linux && docker') {
try {
stage('Checkout') {
//branch name from Jenkins environment variables
echo "My branch is: ${env.BRANCH_NAME}"
// this doesn't grab tags pointing to this branch
//checkout scm
// this hack does... https://issues.jenkins.io/browse/JENKINS-45164
checkout([
$class: 'GitSCM',
branches: [[name: 'refs/heads/'+env.BRANCH_NAME]],
extensions: [[$class: 'CloneOption', noTags: false, shallow: false, depth: 0, reference: '']],
userRemoteConfigs: scm.userRemoteConfigs,
])
sh '''
set -euxo pipefail
git checkout "$BRANCH_NAME" --
git reset --hard "origin/$BRANCH_NAME"
'''
}
stage('Build + Deploy') {
sh 'curl --compressed -sL https://code.moparisthebest.com/moparisthebest/self-ci/raw/branch/master/build-ci.sh | bash'
}
currentBuild.result = 'SUCCESS'
} catch (Exception err) {
currentBuild.result = 'FAILURE'
} finally {
stage('Email') {
step([$class: 'Mailer', notifyEveryUnstableBuild: true, recipients: 'admin.jenkins@moparisthebest.com', sendToIndividuals: true])
}
deleteDir()
}
}

25
.ci/build.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
set -euxo pipefail
[ $JAVA_VERSION -lt 8 ] && echo "build does not support JAVA_VERSION: $JAVA_VERSION" && exit 0
echo "starting build for JAVA_VERSION: $JAVA_VERSION"
# install deps
mvn install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
# clean and test
mvn clean test -B
# publish only from java 8 and master branch
if [ "$BRANCH_NAME" == "master" -a $JAVA_VERSION -eq 8 ]
then
echo 'deploying to maven'
mvn deploy -Dmaven.test.skip=true -B
mkdir -p release
find -type f -name '*.jar' -print0 | xargs -0n1 -I {} mv '{}' 'release/'
fi
echo 'build success!'
exit 0

View File

@ -23,7 +23,7 @@
<parent>
<groupId>com.moparisthebest.jbgjob</groupId>
<artifactId>jbgjob</artifactId>
<version>0.3.3</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>core</artifactId>

View File

@ -28,7 +28,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.moparisthebest.jbgjob</groupId>
<artifactId>jbgjob</artifactId>
<version>0.3.3</version>
<version>0.3.5-SNAPSHOT</version>
<name>jbgjob</name>
<description>
jBgJob (Java Background Job) lets you schedule Java jobs to be ran in the background. They can run in any
@ -85,8 +85,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.5</source>
<target>1.5</target>
<source>1.7</source>
<target>1.7</target>
<debug>false</debug>
</configuration>
</plugin>

View File

@ -23,7 +23,7 @@
<parent>
<groupId>com.moparisthebest.jbgjob</groupId>
<artifactId>jbgjob</artifactId>
<version>0.3.3</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redisprocessor</artifactId>

View File

@ -86,7 +86,11 @@ public class RedisErrorQueueThread extends RedisProcessingQueueThread {
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop);
this(queue, executor, errorQueueSuffix, processingQueueSuffix, queuePrefix, pool, stop, null);
}
public RedisErrorQueueThread(String queue, ScheduledItemExecutor executor, String errorQueueSuffix, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, noWaitQueues);
this.errorQueue = this.queue + defaultIfEmpty(errorQueueSuffix, defaultErrorQueueSuffix);
}

View File

@ -77,12 +77,22 @@ public class RedisProcessingQueueThread extends RedisThread {
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop) {
super(queue, executor, queuePrefix, pool, stop);
this(queue, executor, processingQueueSuffix, queuePrefix, pool, stop, null);
}
public RedisProcessingQueueThread(String queue, ScheduledItemExecutor executor, String processingQueueSuffix, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queue, executor, queuePrefix, pool, stop, noWaitQueues);
this.processingQueue = this.queue + defaultIfEmpty(processingQueueSuffix, defaultProcessingQueueSuffix);
}
@Override
protected String pollRedis(final Jedis jedis, final int timeout) {
protected String pollRedisNoWait(final Jedis jedis, final String queueName) {
if (debug) System.out.printf("redis> RPOPLPUSH %s %s\n", queueName, processingQueue);
return jedis.rpoplpush(queueName, processingQueue);
}
@Override
protected String pollRedisBlock(final Jedis jedis, final int timeout) {
if (debug) System.out.printf("redis> BRPOPLPUSH %s %s %d\n", queue, processingQueue, timeout);
return jedis.brpoplpush(queue, processingQueue, timeout);
}

View File

@ -29,6 +29,7 @@ import com.moparisthebest.jbgjob.result.PrintStackTraceExecutionResult;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -59,6 +60,7 @@ public class RedisThread extends RedisScheduler implements Runnable {
protected final String queue;
protected final Stop stop;
protected final ScheduledItemExecutor executor;
protected final Iterable<String> noWaitQueues;
public RedisThread() {
this(null, null, (String)null, null);
@ -101,10 +103,24 @@ public class RedisThread extends RedisScheduler implements Runnable {
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop) {
this(queue, executor, queuePrefix, pool, stop, null);
}
public RedisThread(String queue, ScheduledItemExecutor executor, String queuePrefix, JedisPool pool, Stop stop, Iterable<String> noWaitQueues) {
super(queuePrefix, pool);
this.queue = this.queuePrefix + defaultIfEmpty(queue, AbstractScheduler.defaultQueue);
this.executor = executor != null ? executor : new ScheduledItemExecutor();
List<String> nwq = null;
if(noWaitQueues != null) {
nwq = new ArrayList<String>();
for(final String q : noWaitQueues)
nwq.add(this.queuePrefix + q);
if(nwq.isEmpty())
nwq = null;
}
this.noWaitQueues = nwq;
if(stop == null){
final String shutdownKey = this.queuePrefix + "shutdown";
stop = new Stop(){
@ -116,13 +132,30 @@ public class RedisThread extends RedisScheduler implements Runnable {
this.stop = stop;
}
protected String pollRedis(final Jedis jedis, final int timeout) {
protected String pollRedisNoWait(final Jedis jedis, final String queueName) {
if (debug) System.out.printf("redis> RPOP %s\n", queueName);
return jedis.rpop(queueName);
}
protected String pollRedisBlock(final Jedis jedis, final int timeout) {
if (debug) System.out.printf("redis> BRPOP %s %d\n", queue, timeout);
final List<String> items = jedis.brpop(timeout, queue);
//System.out.println("items: " + items);
return (items == null || items.size() < 2) ? null : items.get(1);
}
protected String pollRedis(final Jedis jedis, final int timeout) {
String ret = pollRedisBlock(jedis, timeout);
if(ret != null || noWaitQueues == null)
return ret;
for(final String queueName : noWaitQueues) {
ret = pollRedisNoWait(jedis, queueName);
if(ret != null)
return ret;
}
return null;
}
private static final ExecutionResult noop = new PrintStackTraceExecutionResult();
protected ExecutionResult getExecutionResult(final String scheduledItemString) {

View File

@ -23,7 +23,7 @@
<parent>
<groupId>com.moparisthebest.jbgjob</groupId>
<artifactId>jbgjob</artifactId>
<version>0.3.3</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redisscheduler</artifactId>
@ -41,7 +41,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.0</version>
<version>2.9.9</version>
</dependency>
</dependencies>
<profiles>

View File

@ -39,8 +39,6 @@ public class RedisScheduler extends AbstractScheduler {
public static final String defaultQueuePrefix;
public static final Module redisModule;
static {
String prefix = System.getProperty("redis.queuePrefix");
if (isEmpty(prefix))
@ -49,47 +47,11 @@ public class RedisScheduler extends AbstractScheduler {
} catch (Throwable e) {
}
defaultQueuePrefix = defaultIfEmpty(prefix, "");
@SuppressWarnings({"unchecked"})
final Class<Set> singleton = (Class<Set>) Collections.singleton(5L).getClass();
@SuppressWarnings({"unchecked"})
final Class<Map> singletonMap = (Class<Map>) Collections.singletonMap(5L, 5L).getClass();
@SuppressWarnings({"unchecked"})
final Class<List> singletonList = (Class<List>) Collections.singletonList(5L).getClass();
@SuppressWarnings({"unchecked"})
final Class<List> asList = (Class<List>) Arrays.asList(5L).getClass();
redisModule = new SimpleModule("OrderedMap", new Version(1, 0, 0, null, null, null)).addDeserializer(
singleton, new JsonDeserializer<Set>() {
@Override
public Set deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
return Collections.singleton(jp.getCodec().readValue(jp, Object[].class)[0]);
}
}).addDeserializer(
singletonMap, new JsonDeserializer<Map>() {
@Override
public Map deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
final Map.Entry entry = (Map.Entry) jp.getCodec().readValue(jp, HashMap.class).entrySet().iterator().next();
return Collections.singletonMap(entry.getKey(), entry.getValue());
}
}).addDeserializer(
singletonList, new JsonDeserializer<List>() {
@Override
public List deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
return Collections.singletonList(jp.getCodec().readValue(jp, Object[].class)[0]);
}
}).addDeserializer(
asList, new JsonDeserializer<List>() {
@Override
public List deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
return Arrays.asList(jp.getCodec().readValue(jp, Object[].class));
}
});
}
protected final String queuePrefix;
protected final ObjectMapper om = new ObjectMapper().enableDefaultTyping().registerModule(redisModule);
protected final ObjectMapper om = new ObjectMapper().enableDefaultTyping();
protected final JedisPool pool;
public RedisScheduler() {

View File

@ -21,6 +21,9 @@
package com.moparisthebest.jbgjob;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.*;
public class RedisSchedulerTest extends AbstractSchedulerTests {
@ -28,4 +31,86 @@ public class RedisSchedulerTest extends AbstractSchedulerTests {
public static void setUp() throws Throwable {
bg = new RedisScheduler();
}
@Test
public void testSerialization() throws Throwable {
bg.testSerialization(new JacksonTest(arrayList(new InList(1L))));
bg.testSerialization(new JacksonTest(Arrays.asList(new InList(1L))));
bg.testSerialization(new JacksonTest(Collections.singletonList(new InList(1L))));
bg.testSerialization(new JacksonTest(Collections.singleton(new InList(1L))));
bg.testSerialization(new JacksonTestMap(Collections.singletonMap(1L, new InList(1L))));
}
public static <T> List<T> arrayList(T o) {
final List<T> ret = new ArrayList<>();
ret.add(o);
return ret;
}
public static class InList {
private Long a;
public InList(Long a) {
this.a = a;
}
public InList() {
}
public Long getA() {
return a;
}
public void setA(final Long a) {
this.a = a;
}
@Override
public String toString() {
return "InList{" +
"a=" + a +
'}';
}
}
public static class JacksonTest {
public final Collection<InList> inLists;
public JacksonTest() {
this(null);
}
public JacksonTest(final Collection<InList> inLists) {
this.inLists = inLists;
}
@Override
public String toString() {
return "JacksonTest{" +
"inLists=" + inLists +
'}';
}
}
public static class JacksonTestMap {
public final Map<Long, InList> inLists;
public JacksonTestMap() {
this(null);
}
public JacksonTestMap(final Map<Long, InList> inLists) {
this.inLists = inLists;
}
@Override
public String toString() {
return "JacksonTestMap{" +
"inLists=" + inLists +
'}';
}
}
}

View File

@ -23,7 +23,7 @@
<parent>
<groupId>com.moparisthebest.jbgjob</groupId>
<artifactId>jbgjob</artifactId>
<version>0.3.3</version>
<version>0.3.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>threadscheduler</artifactId>