Make private methods static
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumTransactionProxyTest.java
index 2980f83564fa3b1cafc483cae580fef79838611f..6ca17838d0bbd71e97c38f9e03678c17ddf570bb 100644 (file)
@@ -8,39 +8,49 @@
 package org.opendaylight.controller.cluster.datastore.compat;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
+import java.util.concurrent.TimeoutException;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
@@ -49,7 +59,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
  */
 public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest {
 
-    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+    private static WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
@@ -65,7 +75,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
-    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+    private static MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
@@ -81,7 +91,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
-    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
+    private static DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
@@ -93,8 +103,37 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
+    private static CanCommitTransaction eqCanCommitTransaction(final String transactionID) {
+        ArgumentMatcher<CanCommitTransaction> matcher = new ArgumentMatcher<CanCommitTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) &&
+                        CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private static CommitTransaction eqCommitTransaction(final String transactionID) {
+        ArgumentMatcher<CommitTransaction> matcher = new ArgumentMatcher<CommitTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) &&
+                        CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private static Future<Object> readySerializedTxReply(String path, short version) {
+        return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+    }
+
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version,
+                DefaultShardStrategy.DEFAULT_SHARD);
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -110,13 +149,13 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
                 get(5, TimeUnit.SECONDS);
@@ -130,17 +169,39 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        AbstractThreePhaseCommitCohort<?> proxy = transactionProxy.ready();
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        doThreePhaseCommit(actorRef, transactionProxy, proxy);
+
         return actorRef;
     }
 
+    private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy,
+            AbstractThreePhaseCommitCohort<?> proxy) throws InterruptedException, ExecutionException, TimeoutException {
+        doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+                        transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+        doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+                        transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+        Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
+        assertEquals("canCommit", true, canCommit.booleanValue());
+
+        proxy.preCommit().get(3, TimeUnit.SECONDS);
+
+        proxy.commit().get(3, TimeUnit.SECONDS);
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+                transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+                transactionProxy.getIdentifier().toString()), any(Timeout.class));
+    }
+
     @Test
     public void testCompatibilityWithBaseHeliumVersion() throws Exception {
         ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
@@ -158,34 +219,29 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
     }
 
     @Test
-    @Ignore
-    // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
-    // creating transaction actors for write-only Tx's.
     public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
         short version = DataStoreVersions.HELIUM_2_VERSION;
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
+                DefaultShardStrategy.DEFAULT_SHARD);
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        AbstractThreePhaseCommitCohort<?> proxy = (AbstractThreePhaseCommitCohort<?>) ready;
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        doThreePhaseCommit(actorRef, transactionProxy, proxy);
     }
 }