QueryRunner.runRetry and runRetryFuture methods
This commit is contained in:
parent
a22109f31c
commit
8400ed3d00
|
@ -3,6 +3,7 @@ package com.moparisthebest.jdbc;
|
||||||
import com.moparisthebest.jdbc.codegen.JdbcMapper;
|
import com.moparisthebest.jdbc.codegen.JdbcMapper;
|
||||||
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
import static com.moparisthebest.jdbc.TryClose.tryClose;
|
import static com.moparisthebest.jdbc.TryClose.tryClose;
|
||||||
|
|
||||||
|
@ -12,16 +13,56 @@ import static com.moparisthebest.jdbc.TryClose.tryClose;
|
||||||
*/
|
*/
|
||||||
public class QueryRunner<T extends JdbcMapper> {
|
public class QueryRunner<T extends JdbcMapper> {
|
||||||
|
|
||||||
|
private static final int defaultRetryCount = 10;
|
||||||
|
private static final DelayStrategy defaultDelayStrategy = new ExponentialBackoff();
|
||||||
|
private static final ExecutorService defaultExecutorService = Executors.newCachedThreadPool(); // todo: good or bad default?
|
||||||
|
|
||||||
private final Factory<T> factory;
|
private final Factory<T> factory;
|
||||||
|
private final DelayStrategy delayStrategy;
|
||||||
|
private final int retryCount;
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final DelayStrategy delayStrategy, final ExecutorService executorService, final int retryCount) {
|
||||||
|
if (factory == null)
|
||||||
|
throw new NullPointerException("factory must be non-null");
|
||||||
|
if (delayStrategy == null)
|
||||||
|
throw new NullPointerException("delayStrategy must be non-null");
|
||||||
|
if (executorService == null)
|
||||||
|
throw new NullPointerException("executorService must be non-null");
|
||||||
|
if (retryCount < 1)
|
||||||
|
throw new IllegalArgumentException("retryCount must be > 0");
|
||||||
|
this.factory = factory;
|
||||||
|
this.delayStrategy = delayStrategy;
|
||||||
|
this.retryCount = retryCount;
|
||||||
|
this.executorService = Executors.newSingleThreadExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
public QueryRunner(final Factory<T> factory) {
|
public QueryRunner(final Factory<T> factory) {
|
||||||
if(factory == null)
|
this(factory, defaultDelayStrategy, defaultExecutorService, defaultRetryCount);
|
||||||
throw new NullPointerException("factory must be non-null");
|
}
|
||||||
this.factory = factory;
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final DelayStrategy delayStrategy) {
|
||||||
|
this(factory, delayStrategy, defaultExecutorService, defaultRetryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final int retryCount) {
|
||||||
|
this(factory, defaultDelayStrategy, defaultExecutorService, retryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final ExecutorService executorService) {
|
||||||
|
this(factory, defaultDelayStrategy, executorService, defaultRetryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final DelayStrategy delayStrategy, final ExecutorService executorService) {
|
||||||
|
this(factory, delayStrategy, executorService, defaultRetryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryRunner(final Factory<T> factory, final ExecutorService executorService, final int retryCount) {
|
||||||
|
this(factory, defaultDelayStrategy, executorService, retryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <E> E run(final Runner<T, E> query) throws SQLException {
|
public <E> E run(final Runner<T, E> query) throws SQLException {
|
||||||
if(query == null)
|
if (query == null)
|
||||||
throw new NullPointerException("query must be non-null");
|
throw new NullPointerException("query must be non-null");
|
||||||
T dao = null;
|
T dao = null;
|
||||||
try {
|
try {
|
||||||
|
@ -33,7 +74,7 @@ public class QueryRunner<T extends JdbcMapper> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public <E> E runInTransaction(final Runner<T, E> query) throws SQLException {
|
public <E> E runInTransaction(final Runner<T, E> query) throws SQLException {
|
||||||
if(query == null)
|
if (query == null)
|
||||||
throw new NullPointerException("query must be non-null");
|
throw new NullPointerException("query must be non-null");
|
||||||
T dao = null;
|
T dao = null;
|
||||||
try {
|
try {
|
||||||
|
@ -46,20 +87,20 @@ public class QueryRunner<T extends JdbcMapper> {
|
||||||
if (dao != null) {
|
if (dao != null) {
|
||||||
try {
|
try {
|
||||||
dao.getConnection().rollback();
|
dao.getConnection().rollback();
|
||||||
} catch(SQLException excep) {
|
} catch (SQLException excep) {
|
||||||
// ignore to throw original
|
// ignore to throw original
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(e instanceof SQLException)
|
if (e instanceof SQLException)
|
||||||
throw (SQLException) e;
|
throw (SQLException) e;
|
||||||
if(e instanceof RuntimeException)
|
if (e instanceof RuntimeException)
|
||||||
throw (RuntimeException) e;
|
throw (RuntimeException) e;
|
||||||
throw new RuntimeException("odd error should never happen", e);
|
throw new RuntimeException("odd error should never happen", e);
|
||||||
} finally {
|
} finally {
|
||||||
if (dao != null) {
|
if (dao != null) {
|
||||||
try {
|
try {
|
||||||
dao.getConnection().setAutoCommit(true);
|
dao.getConnection().setAutoCommit(true);
|
||||||
} catch(SQLException excep) {
|
} catch (SQLException excep) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
tryClose(dao);
|
tryClose(dao);
|
||||||
|
@ -67,7 +108,119 @@ public class QueryRunner<T extends JdbcMapper> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <E> E runRetry(final Runner<T, E> query) throws SQLException {
|
||||||
|
SQLException lastException = null;
|
||||||
|
int x = 0;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
return runInTransaction(query);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
lastException = e;
|
||||||
|
try {
|
||||||
|
Thread.sleep(delayStrategy.getDelay(x));
|
||||||
|
} catch (InterruptedException e2) {
|
||||||
|
Thread.interrupted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (++x <= retryCount);
|
||||||
|
throw lastException;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <E> Future<E> runRetryFuture(final Runner<T, E> query) {
|
||||||
|
// todo: sleeps in thread, could use ScheduledExecutorService maybe?
|
||||||
|
return executorService.submit(new Callable<E>() {
|
||||||
|
@Override
|
||||||
|
public E call() throws Exception {
|
||||||
|
return runRetry(query);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public static interface Runner<T, E> {
|
public static interface Runner<T, E> {
|
||||||
E run(T dao) throws SQLException;
|
E run(T dao) throws SQLException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static interface DelayStrategy {
|
||||||
|
long getDelay(int attempt);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DelayStrategy exponentialBackoff() {
|
||||||
|
return defaultDelayStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DelayStrategy exponentialBackoff(long minBackoff, long maxBackoff, long slotTime) {
|
||||||
|
return new ExponentialBackoff(minBackoff, maxBackoff, slotTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DelayStrategy fixedDelay(long delay) {
|
||||||
|
return new FixedDelay(delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DelayStrategy incrementalDelay(long initialInterval, long incrementalInterval) {
|
||||||
|
return new IncrementalDelay(initialInterval, incrementalInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements truncated binary exponential backoff to calculate retry delay per
|
||||||
|
* IEEE 802.3-2008 Section 1. There will be at most ten (10) contention periods
|
||||||
|
* of backoff, each contention period whose amount is equal to the delta backoff.
|
||||||
|
*
|
||||||
|
* @author Robert Buck (buck.robert.j@gmail.com)
|
||||||
|
*/
|
||||||
|
private static class ExponentialBackoff implements DelayStrategy {
|
||||||
|
|
||||||
|
public static final long DEFAULT_MIN_BACKOFF = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
|
||||||
|
public static final long DEFAULT_MAX_BACKOFF = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
|
||||||
|
public static final long DEFAULT_SLOT_TIME = TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
private final long minBackoff;
|
||||||
|
private final long maxBackoff;
|
||||||
|
private final long slotTime;
|
||||||
|
|
||||||
|
private ExponentialBackoff() {
|
||||||
|
this(DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_SLOT_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ExponentialBackoff(long minBackoff, long maxBackoff, long slotTime) {
|
||||||
|
this.minBackoff = minBackoff;
|
||||||
|
this.maxBackoff = maxBackoff;
|
||||||
|
this.slotTime = slotTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(final int retryCount) {
|
||||||
|
final int MAX_CONTENTION_PERIODS = 10;
|
||||||
|
return retryCount == 0 ? 0 : Math.min(minBackoff + ThreadLocalRandom.current().nextInt(2 << Math.min(retryCount, MAX_CONTENTION_PERIODS - 1)) * slotTime, maxBackoff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FixedDelay implements DelayStrategy {
|
||||||
|
private final long delay;
|
||||||
|
|
||||||
|
private FixedDelay(final long delay) {
|
||||||
|
this.delay = delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(final int attempt) {
|
||||||
|
return delay;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class IncrementalDelay implements DelayStrategy {
|
||||||
|
|
||||||
|
private final long initialInterval;
|
||||||
|
private final long incrementalInterval;
|
||||||
|
|
||||||
|
private IncrementalDelay(long initialInterval, long incrementalInterval) {
|
||||||
|
this.initialInterval = initialInterval;
|
||||||
|
this.incrementalInterval = incrementalInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(final int attempt) {
|
||||||
|
return initialInterval + incrementalInterval * attempt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,15 +19,25 @@ public class QueryRunnerTest {
|
||||||
public Connection create() throws SQLException {
|
public Connection create() throws SQLException {
|
||||||
return QueryMapperTest.getConnection();
|
return QueryMapperTest.getConnection();
|
||||||
}
|
}
|
||||||
}));
|
}), QueryRunner.fixedDelay(5));
|
||||||
|
|
||||||
private void testPerson(final Person expected, final String query) throws Throwable {
|
private void testPerson(final Person expected, final String query) throws Throwable {
|
||||||
final Person actual = qr.runInTransaction(new QueryRunner.Runner<QueryMapper, Person>() {
|
final Person actual =
|
||||||
|
//qr.run(
|
||||||
|
//qr.runRetry(
|
||||||
|
qr.runRetryFuture(
|
||||||
|
new QueryRunner.Runner<QueryMapper, Person>() {
|
||||||
@Override
|
@Override
|
||||||
public Person run(final QueryMapper qm) throws SQLException {
|
public Person run(final QueryMapper qm) throws SQLException {
|
||||||
|
if(Math.random() < 0.5) {
|
||||||
|
System.out.println("fake fail");
|
||||||
|
throw new SQLException("fake 50% failure rate");
|
||||||
|
}
|
||||||
return qm.toObject(query, expected.getClass(), expected.getPersonNo());
|
return qm.toObject(query, expected.getClass(), expected.getPersonNo());
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
.get()
|
||||||
|
;
|
||||||
/*
|
/*
|
||||||
System.out.println("expected: " + expected);
|
System.out.println("expected: " + expected);
|
||||||
System.out.println("actual: " + actual);
|
System.out.println("actual: " + actual);
|
||||||
|
|
Loading…
Reference in New Issue