private volatile RequestException poisoned;
+ // Private constructor to avoid code duplication.
+ private AbstractClientConnection(final AbstractClientConnection<T> oldConn, final TransmitQueue newQueue) {
+ this.context = Preconditions.checkNotNull(oldConn.context);
+ this.cookie = Preconditions.checkNotNull(oldConn.cookie);
+ this.queue = Preconditions.checkNotNull(newQueue);
+ // Will be updated in finishReplay if needed.
+ this.lastReceivedTicks = oldConn.lastReceivedTicks;
+ }
+
+ // This constructor is only to be called by ConnectingClientConnection constructor.
// Do not allow subclassing outside of this package
- AbstractClientConnection(final ClientActorContext context, final Long cookie,
- final TransmitQueue queue) {
+ AbstractClientConnection(final ClientActorContext context, final Long cookie, final int queueDepth) {
this.context = Preconditions.checkNotNull(context);
this.cookie = Preconditions.checkNotNull(cookie);
- this.queue = Preconditions.checkNotNull(queue);
+ this.queue = new TransmitQueue.Halted(queueDepth);
this.lastReceivedTicks = currentTime();
}
+ // This constructor is only to be called (indirectly) by ReconnectingClientConnection constructor.
// Do not allow subclassing outside of this package
- AbstractClientConnection(final AbstractClientConnection<T> oldConnection, final int targetQueueSize) {
- this.context = oldConnection.context;
- this.cookie = oldConnection.cookie;
- this.queue = new TransmitQueue.Halted(targetQueueSize);
- this.lastReceivedTicks = oldConnection.lastReceivedTicks;
+ AbstractClientConnection(final AbstractClientConnection<T> oldConn) {
+ this(oldConn, new TransmitQueue.Halted(oldConn.queue, oldConn.currentTime()));
+ }
+
+ // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor.
+ // Do not allow subclassing outside of this package
+ AbstractClientConnection(final AbstractClientConnection<T> oldConn, final T newBackend, final int queueDepth) {
+ this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime()));
}
public final ClientActorContext context() {
}
}
+ // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
+ final void cancelDebt() {
+ queue.cancelDebt(currentTime());
+ }
+
public abstract Optional<T> getBackendInfo();
final Collection<ConnectionEntry> startReplay() {
package org.opendaylight.controller.cluster.access.client;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
private final T backend;
- AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
- super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
- this.backend = Preconditions.checkNotNull(backend);
+ // To be called by ConnectedClientConnection only.
+ AbstractReceivingClientConnection(final AbstractClientConnection<T> oldConnection, final T newBackend) {
+ super(oldConnection, newBackend, targetQueueSize(newBackend));
+ this.backend = newBackend;
}
+ // To be called by ReconnectingClientConnection only.
AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
- super(oldConnection, targetQueueSize(oldConnection.backend));
+ super(oldConnection);
this.backend = oldConnection.backend;
}
* @param limit of open tasks to avoid exceeding
* @param ticksPerTask value to use as default
*/
- private AveragingProgressTracker(final int limit, final long ticksPerTask) {
+ private AveragingProgressTracker(final long limit, final long ticksPerTask) {
super(ticksPerTask);
tasksOpenLimit = limit;
noDelayThreshold = limit / 2;
*
* @param limit of open tasks to avoid exceeding
*/
- AveragingProgressTracker(final int limit) {
+ AveragingProgressTracker(final long limit) {
this(limit, DEFAULT_TICKS_PER_TASK);
}
/**
- * Create a copy of an existing tracker, all future tracking is fully independent.
+ * Construct a new tracker suitable for a new task queue related to a "reconnect".
*
- * @param tracker the instance to copy state from
+ * <p>The limit is set independently of the old tracker.
+ *
+ * @param oldTracker the tracker used for the previously used backend
+ * @param limit of open tasks to avoid exceeding
+ * @param now tick number corresponding to caller's present
+ */
+ AveragingProgressTracker(final ProgressTracker oldTracker, final long limit, final long now) {
+ super(oldTracker, now);
+ tasksOpenLimit = limit;
+ noDelayThreshold = limit / 2;
+ }
+
+ /**
+ * Construct a new tracker suitable for a new task queue related to a "reconnect".
+ *
+ * <p>The limit is copied from the old tracker.
+ *
+ * @param oldTracker the tracker used for the previously used backend
+ * @param now tick number corresponding to caller's present
*/
- AveragingProgressTracker(final AveragingProgressTracker tracker) {
- super(tracker);
- this.tasksOpenLimit = tracker.tasksOpenLimit;
- this.noDelayThreshold = tracker.noDelayThreshold;
+ AveragingProgressTracker(final AveragingProgressTracker oldTracker, final long now) {
+ this(oldTracker, oldTracker.tasksOpenLimit, now);
}
- // Public shared access (read-only) accessor-like methods
+ // Protected read-only methods
/**
* Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
* @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
*/
@Override
- public long estimateIsolatedDelay(final long now) {
+ protected long estimateIsolatedDelay(final long now) {
final long open = tasksOpen();
if (open <= noDelayThreshold) {
return 0L;
* Calculate the task capacity relative to the limit on open tasks. In real terms this value can be
* in the open interval (0.0, 0.5).
*/
- final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit);
+ final double relativeRemainingCapacity = 1.0 - (double) open / tasksOpenLimit;
/*
* Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching
@GuardedBy("connectionsLock")
@Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection<T> newConn);
- private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
+ private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> oldConn,
final T backend, final Throwable failure) {
if (failure != null) {
if (failure instanceof TimeoutException) {
- if (!conn.equals(connections.get(shard))) {
+ if (!oldConn.equals(connections.get(shard))) {
// AbstractClientConnection will remove itself when it decides there is no point in continuing,
// at which point we want to stop retrying
- LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn,
- failure);
+ LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard,
+ oldConn, failure);
return;
}
LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard,
RESOLVE_RETRY_DURATION, failure);
context().executeInActor(b -> {
- resolveConnection(shard, conn);
+ resolveConnection(shard, oldConn);
return b;
}, RESOLVE_RETRY_DURATION);
return;
cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure);
}
- conn.poison(cause);
+ oldConn.poison(cause);
return;
}
final Stopwatch sw = Stopwatch.createStarted();
// Create a new connected connection
- final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
- conn.cookie(), backend);
- LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+ final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(oldConn, backend);
+ LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn);
// Start reconnecting without the old connection lock held
final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
// Lock the old connection and get a reference to its entries
- final Collection<ConnectionEntry> replayIterable = conn.startReplay();
+ final Collection<ConnectionEntry> replayIterable = oldConn.startReplay();
// Finish the connection attempt
final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
+ // Cancel sleep debt after entries were replayed, before new connection starts receiving.
+ newConn.cancelDebt();
+
// Install the forwarder, unlocking the old connection
- conn.finishReplay(forwarder);
+ oldConn.finishReplay(forwarder);
// Make sure new lookups pick up the new connection
- if (!connections.replace(shard, conn, newConn)) {
- final AbstractClientConnection<T> existing = connections.get(conn.cookie());
+ if (!connections.replace(shard, oldConn, newConn)) {
+ final AbstractClientConnection<T> existing = connections.get(oldConn.cookie());
LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo",
- persistenceId(), conn, existing, newConn);
+ persistenceId(), oldConn, existing, newConn);
} else {
- LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw);
+ LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw);
}
} finally {
connectionsLock.unlockWrite(stamp);
@Beta
@NotThreadSafe
public final class ConnectedClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
- ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
- super(context, cookie, backend);
+
+ ConnectedClientConnection(final AbstractClientConnection<T> oldConnection, final T newBackend) {
+ super(oldConnection, newBackend);
}
@Override
// Initial state, never instantiated externally
ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
- super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH));
+ super(context, cookie, TARGET_QUEUE_DEPTH);
}
@Override
}
/**
- * Construct a copy of an existing tracker, all future tracking is fully independent.
+ * Construct a new tracker suitable for a new task queue related to a "reconnect".
*
- * @param tracker the instance to copy state from
- */
- ProgressTracker(final ProgressTracker tracker) {
- this.defaultTicksPerTask = tracker.defaultTicksPerTask;
- this.tasksClosed = tracker.tasksClosed;
- this.tasksEncountered = tracker.tasksEncountered;
- this.lastClosed = tracker.lastClosed;
- this.lastIdle = tracker.lastIdle;
- this.nearestAllowed = tracker.nearestAllowed;
- this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
- }
-
- // Public shared access (read-only) accessor-like methods
-
- /**
- * Get the value of default ticks per task this instance was created to use.
+ * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
+ * possibly resulting in a different number of tasks.
+ * Also, performance of the new backend can be different, but the perforance of the previous backend
+ * is generally still better estimate than defaults of a brand new tracker.
*
- * @return default ticks per task value
+ * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
+ * but other internal values should lead to a balanced performance
+ * after tasks opened in the source tracker are "replayed" into the new tracker.
+ *
+ * <p>In particular, this impementation keeps the number of closed tasks the same,
+ * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
+ *
+ * @param oldTracker the tracker used for the previously used backend
+ * @param now tick number corresponding to caller's present
*/
- public final long defaultTicksPerTask() {
- return defaultTicksPerTask;
+ ProgressTracker(final ProgressTracker oldTracker, final long now) {
+ this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
+ this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
+ this.lastClosed = oldTracker.lastClosed;
+ this.nearestAllowed = oldTracker.nearestAllowed; // Call cancelDebt explicitly if needed.
+ this.lastIdle = oldTracker.lastIdle;
+ this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
+ if (!oldTracker.isIdle()) {
+ transitToIdle(now);
+ }
}
+ // "Public" shared access (read-only) accessor-like methods
+
/**
* Get number of tasks closed so far.
*
* @return number of tasks known to be finished already; the value never decreases
*/
- public final long tasksClosed() {
+ final long tasksClosed() {
return tasksClosed;
}
*
* @return number of tasks encountered so far, open or finished; the value never decreases
*/
- public final long tasksEncountered() {
+ final long tasksEncountered() {
return tasksEncountered;
}
*
* @return number of tasks started but not finished yet
*/
- public final long tasksOpen() {
+ final long tasksOpen() { // TODO: Should we return int?
// TODO: Should we check the return value is non-negative?
return tasksEncountered - tasksClosed;
}
*
* @return {@code true} if every encountered task is already closed, {@code false} otherwise
*/
- public boolean isIdle() {
+ final boolean isIdle() {
return tasksClosed >= tasksEncountered;
}
* @param now tick number corresponding to caller's present
* @return number of ticks backend is neither idle nor responding
*/
- public long ticksStalling(final long now) {
+ final long ticksStalling(final long now) {
return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
}
+ // Read only protected methods.
+
/**
- * Number of ticks elapsed (before now) while there was at least one open task.
+ * Get the value of default ticks per task this instance was created to use.
*
- * @param now tick number corresponding to caller's present
- * @return number of ticks there was at least one task open
+ * @return default ticks per task value
*/
- public long ticksWorked(final long now) {
- return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+ protected final long defaultTicksPerTask() {
+ return defaultTicksPerTask;
}
/**
* @param now tick number corresponding to caller's present
* @return total ticks worked divided by closed tasks, or the default value if no closed tasks
*/
- public double ticksWorkedPerClosedTask(final long now) {
+ protected final double ticksWorkedPerClosedTask(final long now) {
if (tasksClosed < 1) {
return defaultTicksPerTask;
}
return (double) ticksWorked(now) / tasksClosed;
}
+ // Read only private methods.
+
/**
- * Give an estimate of openTask() return value.
- *
- * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
- *
- * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+ * Number of ticks elapsed (before now) while there was at least one open task.
*
* @param now tick number corresponding to caller's present
- * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+ * @return number of ticks there was at least one task open
*/
- public long estimateDelay(final long now) {
- return estimateAllowed(now) - now;
+ private long ticksWorked(final long now) {
+ return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
}
/**
* @param now tick number corresponding to caller's present
* @return estimated tick number when all threads with opened tasks are done waiting
*/
- public long estimateAllowed(final long now) {
+ private long estimateAllowed(final long now) {
return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
}
- // State-altering public methods.
+ // State-altering "public" methods.
/**
* Track a task is being closed.
* @param transmitTicks see TransitQueue#recordCompletion
* @param execNanos see TransitQueue#recordCompletion
*/
- public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+ final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
if (isIdle()) {
LOG.info("Attempted to close a task while no tasks are open");
} else {
- protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+ unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
}
}
* @param now tick number corresponding to caller's present
* @return number of ticks (nanos) the caller thread should wait before opening another task
*/
- public long openTask(final long now) {
- protectedOpenTask(now);
+ final long openTask(final long now) {
+ openTaskWithoutThrottle(now);
return reserveDelay(now);
}
- // Internal state-altering methods. Protected instead of private,
- // allowing subclasses to weaken ad-hoc invariants of current implementation.
+ /**
+ * Set nearestAllowed value to now.
+ *
+ * <p>This is useful when new a backend has just connected,
+ * after a period of no working backend present.
+ * The accumulated delays should not limit future tasks.
+ * The queue fullness and the averaged backend performance are kept,
+ * as they should result in good enough estimations for new tasks.
+ *
+ * @param now tick number corresponding to caller's present
+ */
+ final void cancelDebt(final long now) {
+ nearestAllowed = now;
+ }
+
+ // Private state-altering methods.
/**
* Compute the next delay and update nearestAllowed value accordingly.
* @param now tick number corresponding to caller's present
* @return number of ticks (nanos) the caller thread should wait before opening another task
*/
- protected long reserveDelay(final long now) {
+ private long reserveDelay(final long now) {
nearestAllowed = estimateAllowed(now);
return nearestAllowed - now;
}
* @param transmitTicks see TransmitQueue#recordCompletion
* @param execNanos see TransmitQueue#recordCompletion
*/
- protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+ private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
final long execNanos) {
tasksClosed++;
lastClosed = now;
if (isIdle()) {
- elapsedBeforeIdle += now - lastIdle;
+ transitToIdle(now);
}
}
*
* @param now tick number corresponding to caller's present
*/
- protected void protectedOpenTask(final long now) {
+ private void openTaskWithoutThrottle(final long now) {
if (isIdle()) {
- lastIdle = Math.max(now, lastIdle);
+ transitFromIdle(now);
}
tasksEncountered++;
}
+ /**
+ * Update lastIdle as a new "last" just hapened.
+ */
+ private void transitFromIdle(final long now) {
+ lastIdle = Math.max(now, lastIdle);
+ }
+
+ /**
+ * Update elapsedBeforeIdle as the "before" has jast moved.
+ */
+ private void transitToIdle(final long now) {
+ elapsedBeforeIdle += Math.max(0, now - lastIdle);
+ }
+
+ // Protected abstract read-only methods.
+
/**
* Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
*
* @param now tick number corresponding to caller's present
* @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
*/
- abstract long estimateIsolatedDelay(long now);
+ protected abstract long estimateIsolatedDelay(long now);
}
@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ // For ConnectingClientConnection.
Halted(final int targetDepth) {
super(targetDepth);
}
+ // For ReconnectingClientConnection.
+ Halted(final TransmitQueue oldQueue, final long now) {
+ super(oldQueue, now);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
private final BackendInfo backend;
private long nextTxSequence;
- Transmitting(final int targetDepth, final BackendInfo backend) {
- super(targetDepth);
+ // For ConnectedClientConnection.
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+ super(oldQueue, targetDepth, now);
this.backend = Preconditions.checkNotNull(backend);
}
private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
- private final ProgressTracker tracker;
+ private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits.
private ReconnectForwarder successor;
+ /**
+ * Construct initial transmitting queue.
+ */
TransmitQueue(final int targetDepth) {
tracker = new AveragingProgressTracker(targetDepth);
}
+ /**
+ * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+ }
+
+ /**
+ * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+ }
+
+ /**
+ * Cancel the accumulated sum of delays as we expect the new backend to work now.
+ */
+ void cancelDebt(final long now) {
+ tracker.cancelDebt(now);
+ }
+
/**
* Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
* to be added to it during replay. When we set the successor all entries enqueued between when this methods
public static <T extends BackendInfo> ConnectedClientConnection<T> createConnectedConnection(
final ClientActorContext context, final Long cookie, final T backend) {
- return new ConnectedClientConnection<>(context, cookie, backend);
+ return new ConnectedClientConnection<>(new ConnectingClientConnection<>(context, cookie), backend);
}
public static void completeRequest(final AbstractClientConnection<? extends BackendInfo> connection,
package org.opendaylight.controller.cluster.access.client;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.testing.FakeTicker;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.internal.matchers.apachecommons.ReflectionEquals;
public class AveragingProgressTrackerTest {
private static final long CHECKER = TimeUnit.MILLISECONDS.toNanos(500);
assertEquals(0, averagingProgressTracker.estimateIsolatedDelay(ticker.read()));
}
- @Test
- public void copyObjectTest() {
- final AveragingProgressTracker copyAverageProgressTracker = new AveragingProgressTracker(
- averagingProgressTracker);
-
- // copied object is the same as original
- assertTrue(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker));
-
- // afterwards work of copied tracker is independent
- averagingProgressTracker.openTask(ticker.read());
-
- final long time = ticker.read();
- assertNotEquals("Trackers are expected to return different results for tracking",
- averagingProgressTracker.openTask(time), copyAverageProgressTracker.openTask(time));
- assertNotEquals("Trackers are expected to encounter different amount of tasks",
- averagingProgressTracker.tasksEncountered(), copyAverageProgressTracker.tasksEncountered());
-
- // and copied object is then no more the same as original
- assertFalse(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker));
- }
}
\ No newline at end of file
@Override
protected ConnectedClientConnection<BackendInfo> createConnection() {
final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
- return new ConnectedClientConnection<>(context, 0L, backend);
+ final ConnectingClientConnection<BackendInfo> connectingConn = new ConnectingClientConnection<>(context, 0L);
+ return new ConnectedClientConnection<>(connectingConn, backend);
}
@Override
}
private void setupBackend() {
- final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
- mockBackendInfo);
- queue.setForwarder(new SimpleReconnectForwarder(newConn));
- queue = newConn;
+ final ConnectingClientConnection<BackendInfo> connectingConn =
+ new ConnectingClientConnection<>(mockContext, mockCookie);
+ final ConnectedClientConnection<BackendInfo> connectedConn =
+ new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
+ queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
+ queue = connectedConn;
}
private void assertTransmit(final Request<?, ?> expected, final long sequence) {
@Override
protected ReconnectingClientConnection<BackendInfo> createConnection() {
final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
-
- final ConnectedClientConnection<BackendInfo> oldConnection =
- new ConnectedClientConnection<>(context, 0L, backend);
- return new ReconnectingClientConnection<>(oldConnection, mock(RequestException.class));
+ final ConnectingClientConnection<BackendInfo> connectingConn = new ConnectingClientConnection<>(context, 0L);
+ final ConnectedClientConnection<BackendInfo> connectedConn =
+ new ConnectedClientConnection<>(connectingConn, backend);
+ return new ReconnectingClientConnection<>(connectedConn, mock(RequestException.class));
}
@Override
private BackendInfo backendInfo;
+ private static long now() {
+ return Ticker.systemTicker().read();
+ }
+
@Override
protected int getMaxInFlightMessages() {
return backendInfo.getMaxMessages();
@Override
protected TransmitQueue.Transmitting createQueue() {
backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
- return new TransmitQueue.Transmitting(0, backendInfo);
+ return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
}
@Test
final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
final Consumer<Response<?, ?>> callback1 = createConsumerMock();
final Consumer<Response<?, ?>> callback2 = createConsumerMock();
- final long now1 = Ticker.systemTicker().read();
- final long now2 = Ticker.systemTicker().read();
+ final long now1 = now();
+ final long now2 = now();
//enqueue 2 entries
queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
public void testEnqueueCanTransmit() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
+ final long now = now();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
public void testEnqueueBackendFull() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
+ final long now = now();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
queue.enqueue(new ConnectionEntry(request, callback, now), now);
public void testTransmit() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
+ final long now = now();
final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
queue.transmit(entry, now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);