Notify listeners on applySnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
index 676667d8c211d6b9b7b0dd44c7cfd1fd52a1b24a..54a6f1ce232a1e51fe001924d10b00ef6ebcae0e 100644 (file)
@@ -21,6 +21,7 @@ import akka.dispatch.Futures;
 import akka.testkit.TestActorRef;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -48,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -69,6 +70,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+    private final TransactionIdentifier tx = nextTransactionId();
+
 
     @Before
     public void setUp() {
@@ -100,8 +103,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testCanCommitYesWithOneCohort() throws Exception {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
-                        CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+                        CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifyCohortActors();
@@ -110,8 +113,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testCanCommitNoWithOneCohort() throws Exception {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
-                        CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+                        CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
 
         verifyCanCommit(proxy.canCommit(), false);
         verifyCohortActors();
@@ -120,11 +123,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testCanCommitYesWithTwoCohorts() throws Exception {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
                         CanCommitTransactionReply.yes(CURRENT_VERSION))));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifyCohortActors();
@@ -133,12 +136,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testCanCommitNoWithThreeCohorts() throws Exception {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
                         CanCommitTransactionReply.no(CURRENT_VERSION))),
-                newCohortInfo(new CohortActor.Builder("txn-1")));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx)));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         verifyCanCommit(proxy.canCommit(), false);
         verifyCohortActors();
@@ -147,7 +150,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testCanCommitWithExceptionFailure() throws Throwable {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
@@ -155,7 +158,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test(expected = IllegalArgumentException.class)
     public void testCanCommitWithInvalidResponseType() throws Throwable {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
@@ -163,10 +166,10 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testCanCommitWithFailedCohortFuture() throws Throwable {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1")),
+                newCohortInfo(new CohortActor.Builder(tx)),
                 newCohortInfoWithFailedFuture(new TestException()),
-                newCohortInfo(new CohortActor.Builder("txn-1")));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx)));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         propagateExecutionExceptionCause(proxy.canCommit());
     }
@@ -174,13 +177,13 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testAllThreePhasesSuccessful() throws Exception {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").
+                newCohortInfo(new CohortActor.Builder(tx).
                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
-                newCohortInfo(new CohortActor.Builder("txn-1").
+                newCohortInfo(new CohortActor.Builder(tx).
                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifySuccessfulFuture(proxy.preCommit());
@@ -191,13 +194,13 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testCommitWithExceptionFailure() throws Throwable {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").
+                newCohortInfo(new CohortActor.Builder(tx).
                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
-                newCohortInfo(new CohortActor.Builder("txn-1").
+                newCohortInfo(new CohortActor.Builder(tx).
                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
                         expectCommit(new TestException())));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifySuccessfulFuture(proxy.preCommit());
@@ -207,9 +210,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test(expected = IllegalArgumentException.class)
     public void testCommitWithInvalidResponseType() throws Throwable {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").
+                newCohortInfo(new CohortActor.Builder(tx).
                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
-                        expectCommit("invalid"))), "txn-1");
+                        expectCommit("invalid"))), tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifySuccessfulFuture(proxy.preCommit());
@@ -219,8 +222,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testAbort() throws Exception {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
-                        AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectAbort(
+                        AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
 
         verifySuccessfulFuture(proxy.abort());
         verifyCohortActors();
@@ -229,7 +232,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testAbortWithFailure() throws Exception {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
+                newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
 
         // The exception should not get propagated.
         verifySuccessfulFuture(proxy.abort());
@@ -239,9 +242,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testAbortWithFailedCohortFuture() throws Throwable {
         List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfoWithFailedFuture(new TestException()),
-                newCohortInfo(new CohortActor.Builder("txn-1")));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+                newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
+        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
 
         verifySuccessfulFuture(proxy.abort());
         verifyCohortActors();
@@ -250,24 +252,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Test
     public void testWithNoCohorts() throws Exception {
         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
-                Collections.<CohortInfo>emptyList(), "txn-1");
-
-        verifyCanCommit(proxy.canCommit(), true);
-        verifySuccessfulFuture(proxy.preCommit());
-        verifySuccessfulFuture(proxy.commit());
-        verifyCohortActors();
-    }
-
-    @Test
-    public void testBackwardsCompatibilityWithPreBoron() throws Exception {
-        List<CohortInfo> cohorts = Arrays.asList(
-                newCohortInfo(new CohortActor.Builder("txn-1").
-                        expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
-                                CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
-                        expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
-                                CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
-                        DataStoreVersions.LITHIUM_VERSION));
-        ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+                Collections.<CohortInfo>emptyList(), tx);
 
         verifyCanCommit(proxy.canCommit(), true);
         verifySuccessfulFuture(proxy.preCommit());
@@ -345,17 +330,17 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         @Override
         public void onReceive(Object message) {
             if(CanCommitTransaction.isSerializedType(message)) {
+                canCommitCount.incrementAndGet();
                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
                         builder.expCanCommitType, builder.canCommitReply);
-                canCommitCount.incrementAndGet();
             } else if(CommitTransaction.isSerializedType(message)) {
+                commitCount.incrementAndGet();
                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
                         builder.expCommitType, builder.commitReply);
-                commitCount.incrementAndGet();
             } else if(AbortTransaction.isSerializedType(message)) {
+                abortCount.incrementAndGet();
                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
                         builder.expAbortType, builder.abortReply);
-                abortCount.incrementAndGet();
             } else {
                 assertionError = new AssertionError("Unexpected message " + message);
             }
@@ -403,10 +388,10 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
             private Object canCommitReply;
             private Object commitReply;
             private Object abortReply;
-            private final String transactionId;
+            private final TransactionIdentifier transactionId;
 
-            Builder(String transactionId) {
-                this.transactionId = transactionId;
+            Builder(TransactionIdentifier transactionId) {
+                this.transactionId = Preconditions.checkNotNull(transactionId);
             }
 
             Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {