package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
-
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
- private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
- List<ActorPath> cohorts = Lists.newArrayList();
+ private Future<ActorSelection> newCohort() {
+ ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+ ActorSelection actorSelection = getSystem().actorSelection(path);
+ return Futures.successful(actorSelection);
+ }
+
+ private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
- cohorts.add(path);
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ cohortFutures.add(newCohort());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
+ }
+
+ private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+ throws Exception {
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+ cohortFutures.add(newCohort());
+ cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
.successful(((SerializableMessage) responses[i]).toSerializable()));
}
- stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
- isA(requestType), any(FiniteDuration.class));
+ stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
+ isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
- verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
- any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ verify(actorContext, times(nCohorts)).executeOperationAsync(
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
+ }
+
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ }
}
@Test
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(false));
+ CanCommitTransactionReply.NO);
future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ThreePhaseCommitCohortProxy proxy = setupProxy(3);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ Boolean actual = future.get(5, TimeUnit.SECONDS);
+
+ assertEquals("canCommit", false, actual);
- verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCanCommitWithExceptionFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
- proxy.canCommit().get();
+ propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = ExecutionException.class)
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.canCommit().get();
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
}
- @Test
- public void testPreCommit() throws Exception {
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
- setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
- new PreCommitTransactionReply());
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortPath() throws Throwable {
- proxy.preCommit().get();
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
- verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
+ try {
+ propagateExecutionExceptionCause(proxy.canCommit());
+ } finally {
+ verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
}
- @Test(expected = ExecutionException.class)
- public void testPreCommitWithFailure() throws Exception {
- ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+ @Test
+ public void testPreCommit() throws Exception {
+ // Precommit is currently a no-op
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
- new PreCommitTransactionReply(), new RuntimeException("mock"));
+ new PreCommitTransactionReply());
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
}
@Test
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
// The exception should not get propagated.
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
+ @Test
+ public void testAbortWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ // The exception should not get propagated.
+ proxy.abort().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
@Test
public void testCommit() throws Exception {
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
new CommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCommitWithFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCommitWithFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
- new RuntimeException("mock"));
+ new TestException());
- proxy.commit().get();
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test(expected = ExecutionException.class)
- public void teseCommitWithInvalidResponseType() throws Exception {
+ public void testCommitWithInvalidResponseType() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.commit());
+ } finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
+ verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+ }
+
}
@Test
- public void testGetCohortPaths() {
+ public void testAllThreePhasesSuccessful() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- List<ActorPath> paths = proxy.getCohortPaths();
- assertNotNull("getCohortPaths returned null", paths);
- assertEquals("getCohortPaths size", 2, paths.size());
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+ new CommitTransactionReply(), new CommitTransactionReply());
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}