+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
- doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
doReturn(commitTimerContext).when(commitTimer).time();
doReturn(commitSnapshot).when(commitTimer).getSnapshot();
- doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
doReturn(10.0).when(actorContext).getTxCreationLimit();
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
- private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+ private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
future.get(5, TimeUnit.SECONDS);
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- CanCommitTransactionReply.YES);
+ setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
ListenableFuture<Boolean> future = proxy.canCommit();
assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- CanCommitTransactionReply.NO);
+ setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
future = proxy.canCommit();
assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
- verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CanCommitTransaction.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+ setupMockActorContext(CanCommitTransaction.class,
+ CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
ListenableFuture<Boolean> future = proxy.canCommit();
assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
- verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CanCommitTransaction.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = setupProxy(3);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
+ setupMockActorContext(CanCommitTransaction.class,
+ CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
+ CanCommitTransactionReply.yes(CURRENT_VERSION));
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+ Boolean actual = future.get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ assertEquals("canCommit", false, actual);
+
+ verifyCohortInvocations(2, CanCommitTransaction.class);
}
@Test(expected = TestException.class)
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
+ setupMockActorContext(CanCommitTransaction.class, new TestException());
propagateExecutionExceptionCause(proxy.canCommit());
}
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new PreCommitTransactionReply());
+ setupMockActorContext(CanCommitTransaction.class,
+ new CommitTransactionReply());
proxy.canCommit().get(5, TimeUnit.SECONDS);
}
try {
propagateExecutionExceptionCause(proxy.canCommit());
} finally {
- verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(0, CanCommitTransaction.class);
}
}
// Precommit is currently a no-op
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
- new PreCommitTransactionReply());
-
proxy.preCommit().get(5, TimeUnit.SECONDS);
}
public void testAbort() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
+ setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
proxy.abort().get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(1, AbortTransaction.class);
}
@Test
public void testAbortWithFailure() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+ setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
// The exception should not get propagated.
proxy.abort().get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(1, AbortTransaction.class);
}
@Test
// The exception should not get propagated.
proxy.abort().get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(0, AbortTransaction.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
new CommitTransactionReply());
proxy.commit().get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CommitTransaction.class);
}
@Test(expected = TestException.class)
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
new TestException());
propagateExecutionExceptionCause(proxy.commit());
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+ setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
proxy.commit().get(5, TimeUnit.SECONDS);
}
propagateExecutionExceptionCause(proxy.commit());
} finally {
- verify(actorContext, never()).setTxCreationLimit(anyLong());
- verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(0, CommitTransaction.class);
}
}
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
-
- setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
- new PreCommitTransactionReply(), new PreCommitTransactionReply());
+ setupMockActorContext(CanCommitTransaction.class,
+ CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
- setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+ setupMockActorContext(CommitTransaction.class,
new CommitTransactionReply(), new CommitTransactionReply());
assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
- verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CanCommitTransaction.class);
+ verifyCohortInvocations(2, CommitTransaction.class);
- // Verify that the creation limit was changed to 0.5 (based on setup)
- verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
}
@Test
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
- verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}