I was testing with simulated latency in the followers. With a high enough
latency and tx thru-put, the pending commit queue in the
ShardCommitCoordinator got increasingly behind until latencies built up
enough to cause AskTimeoutExeption's on the front-end.
The rate limiter was throttling but not enough. I realized that the rate
limiter times the commit phase but not the canCommit phase. The latter is
what times out with pending tx's sitting in the queue waiting for canCommit.
So I changed ThreePhaseCommitCohortProxy to also time the canCommit
phase. This alleviated the timeouts - even with a really high max latency of
500 ms and 100 tx / sec client thru-put. The rate limiter thru-put
reduced it to about 3 / sec.
Change-Id: I6dc73d1d657519b9410ad034c69d26f19a0cb263
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit
1783b656d3b1712c73c6596f3ae15e54e801d5e5)
@Override
public void failure() {
}
+
+ @Override
+ public void pause() {
+ }
+
+ @Override
+ public void resume() {
+ }
};
class Reference extends AtomicReference<OperationCallback> {
}
void run();
+ void pause();
+ void resume();
void success();
void failure();
}
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private volatile OperationCallback commitOperationCallback;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
return;
}
+ commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+ new TransactionRateLimitingCallback(actorContext);
+
+ commitOperationCallback.run();
+
final Object message = new CanCommitTransaction(transactionId).toSerializable();
final Iterator<ActorSelection> iterator = cohorts.iterator();
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
+ commitOperationCallback.failure();
return;
}
+ // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+ // this means we'll only time the first transaction canCommit which should be fine.
+ commitOperationCallback.pause();
+
boolean result = true;
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
CanCommitTransactionReply reply =
@Override
public ListenableFuture<Void> commit() {
- OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
- new TransactionRateLimitingCallback(actorContext);
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
- callback.run();
+ callback.resume();
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
package org.opendaylight.controller.cluster.datastore;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
/**
* transaction
*/
public class TransactionRateLimitingCallback implements OperationCallback{
+ private static Ticker TICKER = Ticker.systemTicker();
+
+ private static enum State {
+ STOPPED,
+ RUNNING,
+ PAUSED
+ }
+
private final Timer commitTimer;
- private Timer.Context timerContext;
+ private long startTime;
+ private long elapsedTime;
+ private volatile State state = State.STOPPED;
TransactionRateLimitingCallback(ActorContext actorContext){
commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
@Override
public void run() {
- timerContext = commitTimer.time();
+ Preconditions.checkState(state == State.STOPPED, "state is not STOPPED");
+ resume();
+ }
+
+ @Override
+ public void pause() {
+ if(state == State.RUNNING) {
+ elapsedTime += TICKER.read() - startTime;
+ state = State.PAUSED;
+ }
+ }
+
+ @Override
+ public void resume() {
+ if(state != State.RUNNING) {
+ startTime = TICKER.read();
+ state = State.RUNNING;
+ }
}
@Override
public void success() {
- Preconditions.checkState(timerContext != null, "Call run before success");
- timerContext.stop();
+ Preconditions.checkState(state != State.STOPPED, "state is STOPPED");
+ pause();
+ commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS);
+ state = State.STOPPED;
}
@Override
// not going to be useful - so we leave it as it is
}
+ @VisibleForTesting
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker;
+ }
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Ticker;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+/**
+ * Unit tests for TransactionRateLimitingCallback.
+ *
+ * @author Thomas Pantelis
+ */
+public class TransactionRateLimitingCallbackTest {
+
+ @Mock
+ ActorContext mockContext;
+
+ @Mock
+ Timer mockTimer;
+
+ @Mock
+ Ticker mockTicker;
+
+ TransactionRateLimitingCallback callback;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(mockTimer).when(mockContext).getOperationTimer(ActorContext.COMMIT);
+ callback = new TransactionRateLimitingCallback(mockContext);
+ TransactionRateLimitingCallback.setTicker(mockTicker);
+ }
+
+ @Test
+ public void testSuccessWithoutPause() {
+ doReturn(1L).doReturn(201L).when(mockTicker).read();
+
+ callback.run();
+ callback.success();
+
+ verify(mockTimer).update(200L, TimeUnit.NANOSECONDS);
+ }
+
+ @Test
+ public void testSuccessWithPause() {
+ doReturn(1L).doReturn(201L).doReturn(301L).doReturn(351L).when(mockTicker).read();
+
+ callback.run();
+ callback.pause();
+ callback.pause();
+ callback.resume();
+ callback.resume();
+ callback.success();
+
+ verify(mockTimer).update(250L, TimeUnit.NANOSECONDS);
+ }
+
+ @Test
+ public void testFailure() {
+ doReturn(1L).when(mockTicker).read();
+
+ callback.run();
+ callback.failure();
+
+ verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testSuccessWithoutRun(){
+ try {
+ callback.success();
+ fail("Expected IllegalStateException");
+ } catch(IllegalStateException e){
+
+ }
+
+ verify(mockTimer, never()).update(anyLong(), any(TimeUnit.class));
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-
-public class TransactionRateLimitingCommitCallbackTest {
- @Mock
- public ActorContext actorContext;
-
- @Mock
- public DatastoreContext datastoreContext;
-
- @Mock
- public Timer commitTimer;
-
- @Mock
- private Timer.Context commitTimerContext;
-
- @Mock
- private Snapshot commitSnapshot;
-
- @Before
- public void setUp(){
- MockitoAnnotations.initMocks(this);
- doReturn(datastoreContext).when(actorContext).getDatastoreContext();
- doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
- doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
- doReturn(commitTimerContext).when(commitTimer).time();
- doReturn(commitSnapshot).when(commitTimer).getSnapshot();
- }
-
- @Test
- public void testSuccess(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(commitTimerContext).stop();
- }
-
- @Test
- public void testSuccessWithoutRun(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-
- try {
- commitCallback.success();
- fail("Expected IllegalStateException");
- } catch(IllegalStateException e){
-
- }
- verify(commitTimerContext, never()).stop();
- }
-
-
- @Test
- public void testFailure(){
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.failure();
-
- verify(commitTimerContext, never()).stop();
- }
-
-}
\ No newline at end of file