BUG 1735 Registering a data change listener should be asynchronous
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index 87231f08849ed02398472bb792a40a48753a96be..1cd0f85fa1917057f77144f23009a7edb65c0150 100644 (file)
@@ -7,8 +7,9 @@ import akka.dispatch.Futures;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.isA;
@@ -31,14 +32,20 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
+    @SuppressWarnings("serial")
+    static class TestException extends RuntimeException {
+    }
+
     @Mock
     private ActorContext actorContext;
 
@@ -49,15 +56,28 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(getSystem()).when(actorContext).getActorSystem();
     }
 
-    private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
-        List<ActorPath> cohorts = Lists.newArrayList();
+    private Future<ActorPath> newCohortPath() {
+        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+        doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+        return Futures.successful(path);
+    }
+
+    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
         for(int i = 1; i <= nCohorts; i++) {
-            ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
-            cohorts.add(path);
-            doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+            cohortPathFutures.add(newCohortPath());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+    }
+
+    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+            throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        cohortPathFutures.add(newCohortPath());
+        cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
     }
 
     private void setupMockActorContext(Class<?> requestType, Object... responses) {
@@ -72,12 +92,22 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(FiniteDuration.class));
+                isA(requestType));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+                any(ActorSelection.class), isA(requestType));
+    }
+
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            throw e.getCause();
+        }
     }
 
     @Test
@@ -90,14 +120,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new CanCommitTransactionReply(false));
 
         future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -112,7 +142,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -128,19 +158,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCanCommitWithExceptionFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCanCommitWithExceptionFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
 
-        proxy.canCommit().get();
+        propagateExecutionExceptionCause(proxy.canCommit());
     }
 
     @Test(expected = ExecutionException.class)
@@ -151,7 +181,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.canCommit().get();
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.canCommit());
+        } finally {
+            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
@@ -161,7 +203,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -173,7 +215,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new RuntimeException("mock"));
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -182,7 +224,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
 
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
@@ -194,11 +236,22 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
 
         // The exception should not get propagated.
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
 
+    @Test
+    public void testAbortWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        // The exception should not get propagated.
+        proxy.abort().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
     @Test
     public void testCommit() throws Exception {
 
@@ -207,39 +260,64 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
                 new CommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCommitWithFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCommitWithFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
-                new RuntimeException("mock"));
+                new TestException());
 
-        proxy.commit().get();
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     @Test(expected = ExecutionException.class)
-    public void teseCommitWithInvalidResponseType() throws Exception {
+    public void testCommitWithInvalidResponseType() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.commit());
+        } finally {
+            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
-    public void testGetCohortPaths() {
+    public void testAllThreePhasesSuccessful() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        List<ActorPath> paths = proxy.getCohortPaths();
-        assertNotNull("getCohortPaths returned null", paths);
-        assertEquals("getCohortPaths size", 2, paths.size());
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+                new CommitTransactionReply(), new CommitTransactionReply());
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 }