X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fcompat%2FPreLithiumTransactionProxyTest.java;h=6ca17838d0bbd71e97c38f9e03678c17ddf570bb;hp=08c32c9a54fee3afad454ed0d72c02d2d561d1c3;hb=ecccb6d5b43dd73aef0d2d19349d19ee9b4728f7;hpb=12d62e4939a27a3deba065bce79274c9eaf69964 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 08c32c9a54..6ca17838d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -8,37 +8,49 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.dispatch.Futures; +import akka.util.Timeout; import com.google.common.base.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy; import org.opendaylight.controller.cluster.datastore.TransactionProxy; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Future; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -47,7 +59,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; */ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest { - private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { + private static WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -63,7 +75,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } - private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { + private static MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -79,7 +91,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } - private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { + private static DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -91,8 +103,37 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } + private static CanCommitTransaction eqCanCommitTransaction(final String transactionID) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) && + CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); + } + }; + + return argThat(matcher); + } + + private static CommitTransaction eqCommitTransaction(final String transactionID) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) && + CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID); + } + }; + + return argThat(matcher); + } + + private static Future readySerializedTxReply(String path, short version) { + return Futures.successful(new ReadyTransactionReply(path, version).toSerializable()); + } + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version, + DefaultShardStrategy.DEFAULT_SHARD); NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -108,13 +149,13 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). get(5, TimeUnit.SECONDS); @@ -128,17 +169,39 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest transactionProxy.delete(TestModel.TEST_PATH); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + AbstractThreePhaseCommitCohort proxy = transactionProxy.ready(); verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + doThreePhaseCommit(actorRef, transactionProxy, proxy); + return actorRef; } + private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy, + AbstractThreePhaseCommitCohort proxy) throws InterruptedException, ExecutionException, TimeoutException { + doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); + + doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); + + Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit.booleanValue()); + + proxy.preCommit().get(3, TimeUnit.SECONDS); + + proxy.commit().get(3, TimeUnit.SECONDS); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); + } + @Test public void testCompatibilityWithBaseHeliumVersion() throws Exception { ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); @@ -154,4 +217,31 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); } + + @Test + public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { + short version = DataStoreVersions.HELIUM_2_VERSION; + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version, + DefaultShardStrategy.DEFAULT_SHARD); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); + + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + AbstractThreePhaseCommitCohort proxy = (AbstractThreePhaseCommitCohort) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + doThreePhaseCommit(actorRef, transactionProxy, proxy); + } }