Merge changes I880310f2,I9f437328,I552372db,I587fb203,I05f0bd94, ...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 7407897dfac504872e37d35c374397369a8c5576..5e53b29db13f7fff0accf1397dc691a1f071d8a6 100644 (file)
@@ -1,5 +1,20 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -11,12 +26,16 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -42,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -50,24 +70,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest {
@@ -137,9 +139,13 @@ public class TransactionProxyTest {
         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
             @Override
             public boolean matches(Object argument) {
-                CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                return obj.getTransactionId().startsWith(memberName) &&
-                       obj.getTransactionType() == type.ordinal();
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
             }
         };
 
@@ -195,16 +201,25 @@ public class TransactionProxyTest {
     }
 
     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                WriteData obj = WriteData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -228,16 +243,25 @@ public class TransactionProxyTest {
     }
 
     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                MergeData obj = MergeData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -293,13 +317,17 @@ public class TransactionProxyTest {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
 
+    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+    }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data));
+        return Futures.successful(new ReadDataReply(data));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -310,16 +338,24 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
+    private Future<Object> writeSerializedDataReply(short version) {
+        return Futures.successful(new WriteDataReply().toSerializable(version));
+    }
+
     private Future<Object> writeSerializedDataReply() {
-        return Futures.successful(new WriteDataReply().toSerializable());
+        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<WriteDataReply> writeDataReply() {
         return Futures.successful(new WriteDataReply());
     }
 
+    private Future<Object> mergeSerializedDataReply(short version) {
+        return Futures.successful(new MergeDataReply().toSerializable(version));
+    }
+
     private Future<Object> mergeSerializedDataReply() {
-        return Futures.successful(new MergeDataReply().toSerializable());
+        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<MergeDataReply> mergeDataReply() {
@@ -346,7 +382,8 @@ public class TransactionProxyTest {
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -358,13 +395,11 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
         return actorRef;
     }
 
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
 
 
@@ -718,7 +753,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -760,7 +795,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
     }
 
     @Test
@@ -836,22 +871,25 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
-    @Test
-    public void testReadyForwardCompatibility() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+                READ_WRITE, version);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
 
         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
@@ -859,12 +897,17 @@ public class TransactionProxyTest {
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -873,14 +916,29 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
     }
 
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -914,7 +972,7 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+                MergeDataReply.class, TestException.class);
     }
 
     @Test
@@ -942,7 +1000,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }