From: Tom Pantelis Date: Wed, 4 Mar 2015 07:53:09 +0000 (-0500) Subject: Refactor LegacyTransactionConntextImpl X-Git-Tag: release/lithium~431^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=12a411738c5d9030a1a196f04fdf5ec0f4c3395e Refactor LegacyTransactionConntextImpl We have a compat package so the LegacyTransactionConntextImpl really belongs there to keep it separate from the current classes. Also Legacy really isn't a good name as it's too generic. The LegacyTransactionConntextImpl is really specific to pre-Lithium compatibility so I renamed it to PreLithiumTransactionContextImpl to make it clear. I also refactored the helium compatibility tests in TransactionProxyTest into a new PreLithiumTransactionProxyTest class to keep them separate. This will make it easier to remove PreLithiumTransactionContextImpl and its test code when it's no longer needed. Since it needs the same setup as TransactionProxyTest and some of the utility methods, I moved all the setup and utility methods to an AbstractTransactionProxyTest to avoid duplication. This also makes TransactionProxyTest smaller as it now only contains the test methods. Change-Id: I9a81c7a1a07fac9098d497ff301c5d1909094c1b Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java index 80aa3793c1..ec867dda0b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java @@ -12,7 +12,7 @@ import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; -final class OperationCompleter extends OnComplete { +public final class OperationCompleter extends OnComplete { private final Semaphore operationLimiter; OperationCompleter(Semaphore operationLimiter){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 1e222e4c0a..be7169859d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -class TransactionContextImpl extends AbstractTransactionContext { +public class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); private final ActorContext actorContext; @@ -49,7 +49,7 @@ class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; - TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 0bc82af335..e5119cf299 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -731,7 +732,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } else { - return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java similarity index 83% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index 65d82b73d9..e407c7cc47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -5,9 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore; +package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.OperationCompleter; +import org.opendaylight.controller.cluster.datastore.TransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; @@ -23,9 +25,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * * @author Thomas Pantelis */ -class LegacyTransactionContextImpl extends TransactionContextImpl { +public class PreLithiumTransactionContextImpl extends TransactionContextImpl { - LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java new file mode 100644 index 0000000000..60625a05fd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -0,0 +1,388 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.Futures; +import akka.testkit.JavaTestKit; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.CheckedFuture; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +/** + * Abstract base class for TransactionProxy unit tests. + * + * @author Thomas Pantelis + */ +public abstract class AbstractTransactionProxyTest { + private static ActorSystem system; + + private final Configuration configuration = new MockConfiguration(); + + @Mock + protected ActorContext mockActorContext; + + private SchemaContext schemaContext; + + @Mock + private ClusterWrapper mockClusterWrapper; + + protected final String memberName = "mock-member"; + + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). + shardBatchedModificationCount(1); + + @BeforeClass + public static void setUpClass() throws IOException { + + Config config = ConfigFactory.parseMap(ImmutableMap.builder(). + put("akka.actor.default-dispatcher.type", + "akka.testkit.CallingThreadDispatcherConfigurator").build()). + withFallback(ConfigFactory.load()); + system = ActorSystem.create("test", config); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + ShardStrategyFactory.setConfiguration(configuration); + } + + protected ActorSystem getSystem() { + return system; + } + + protected CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + + return false; + } + }; + + return argThat(matcher); + } + + protected DataExists eqSerializedDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof DataExists) && + ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected ReadData eqSerializedReadData() { + return eqSerializedReadData(TestModel.TEST_PATH); + } + + protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(path); + } + }; + + return argThat(matcher); + } + + protected ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof ReadData) && + ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected Future readySerializedTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); + } + + protected Future readyTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path)); + } + + protected Future readSerializedDataReply(NormalizedNode data, + short transactionVersion) { + return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); + } + + protected Future readSerializedDataReply(NormalizedNode data) { + return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); + } + + protected Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); + } + + protected Future dataExistsSerializedReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } + + protected Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists)); + } + + protected Future batchedModificationsReply(int count) { + return Futures.successful(new BatchedModificationsReply(count)); + } + + protected Future incompleteFuture(){ + return mock(Future.class); + } + + protected ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + + protected void expectBatchedModifications(ActorRef actorRef, int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + + protected void expectBatchedModifications(int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); + } + + protected void expectIncompleteBatchedModifications() { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); + } + + protected void expectReadyTransaction(ActorRef actorRef) { + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + protected void expectFailedBatchedModifications(ActorRef actorRef) { + doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + + protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1") + .setMessageVersion(transactionVersion) + .build(); + } + + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { + ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + doReturn(actorSystem.actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + + doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + return actorRef; + } + + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion) { + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), + eqCreateTransaction(memberName, type)); + + return actorRef; + } + + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); + } + + + protected void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } + + protected List captureBatchedModifications(ActorRef actorRef) { + ArgumentCaptor batchedModificationsCaptor = + ArgumentCaptor.forClass(BatchedModifications.class); + verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( + eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); + + List batchedModifications = filterCaptured( + batchedModificationsCaptor, BatchedModifications.class); + return batchedModifications; + } + + protected List filterCaptured(ArgumentCaptor captor, Class type) { + List captured = new ArrayList<>(); + for(T c: captor.getAllValues()) { + if(type.isInstance(c)) { + captured.add(c); + } + } + + return captured; + } + + protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), expected); + } + + protected void verifyBatchedModifications(Object message, Modification... expected) { + assertEquals("Message type", BatchedModifications.class, message.getClass()); + BatchedModifications batchedModifications = (BatchedModifications)message; + assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); + for(int i = 0; i < batchedModifications.getModifications().size(); i++) { + Modification actual = batchedModifications.getModifications().get(i); + assertEquals("Modification type", expected[i].getClass(), actual.getClass()); + assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), + ((AbstractModification)actual).getPath()); + if(actual instanceof WriteModification) { + assertEquals("getData", ((WriteModification)expected[i]).getData(), + ((WriteModification)actual).getData()); + } + } + } + + protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorSelection) { + ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index abfe7eae22..8278d3cffc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -6,11 +6,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -21,78 +19,44 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.InOrder; -import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.DataExists; -import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; -import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.Duration; @SuppressWarnings("resource") -public class TransactionProxyTest { +public class TransactionProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("serial") static class TestException extends RuntimeException { @@ -102,291 +66,6 @@ public class TransactionProxyTest { CheckedFuture invoke(TransactionProxy proxy) throws Exception; } - private static ActorSystem system; - - private final Configuration configuration = new MockConfiguration(); - - @Mock - private ActorContext mockActorContext; - - private SchemaContext schemaContext; - - @Mock - private ClusterWrapper mockClusterWrapper; - - private final String memberName = "mock-member"; - - private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). - shardBatchedModificationCount(1); - - @BeforeClass - public static void setUpClass() throws IOException { - - Config config = ConfigFactory.parseMap(ImmutableMap.builder(). - put("akka.actor.default-dispatcher.type", - "akka.testkit.CallingThreadDispatcherConfigurator").build()). - withFallback(ConfigFactory.load()); - system = ActorSystem.create("test", config); - } - - @AfterClass - public static void tearDownClass() throws IOException { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - @Before - public void setUp(){ - MockitoAnnotations.initMocks(this); - - schemaContext = TestModel.createTestContext(); - - doReturn(getSystem()).when(mockActorContext).getActorSystem(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(schemaContext).when(mockActorContext).getSchemaContext(); - doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); - doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); - doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - - ShardStrategyFactory.setConfiguration(configuration); - } - - private ActorSystem getSystem() { - return system; - } - - private CreateTransaction eqCreateTransaction(final String memberName, - final TransactionType type) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { - CreateTransaction obj = CreateTransaction.fromSerializable(argument); - return obj.getTransactionId().startsWith(memberName) && - obj.getTransactionType() == type.ordinal(); - } - - return false; - } - }; - - return argThat(matcher); - } - - private DataExists eqSerializedDataExists() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && - DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private DataExists eqDataExists() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return (argument instanceof DataExists) && - ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private ReadData eqSerializedReadData() { - return eqSerializedReadData(TestModel.TEST_PATH); - } - - private ReadData eqSerializedReadData(final YangInstanceIdentifier path) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && - ReadData.fromSerializable(argument).getPath().equals(path); - } - }; - - return argThat(matcher); - } - - private ReadData eqReadData() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return (argument instanceof ReadData) && - ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { - WriteData obj = WriteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { - MergeData obj = MergeData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && - DeleteData.fromSerializable(argument).getPath().equals(expPath); - } - }; - - return argThat(matcher); - } - - private Future readySerializedTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); - } - - private Future readyTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path)); - } - - private Future readSerializedDataReply(NormalizedNode data, - short transactionVersion) { - return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); - } - - private Future readSerializedDataReply(NormalizedNode data) { - return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); - } - - private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); - } - - private Future dataExistsSerializedReply(boolean exists) { - return Futures.successful(new DataExistsReply(exists).toSerializable()); - } - - private Future dataExistsReply(boolean exists) { - return Futures.successful(new DataExistsReply(exists)); - } - - private Future batchedModificationsReply(int count) { - return Futures.successful(new BatchedModificationsReply(count)); - } - - private Future incompleteFuture(){ - return mock(Future.class); - } - - private ActorSelection actorSelection(ActorRef actorRef) { - return getSystem().actorSelection(actorRef.path()); - } - - private void expectBatchedModifications(ActorRef actorRef, int count) { - doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); - } - - private void expectBatchedModifications(int count) { - doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); - } - - private void expectIncompleteBatchedModifications() { - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); - } - - private void expectReadyTransaction(ActorRef actorRef) { - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - } - - private void expectFailedBatchedModifications(ActorRef actorRef) { - doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); - } - - private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .setMessageVersion(transactionVersion) - .build(); - } - - private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { - ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(actorSystem.actorSelection(actorRef.path())). - when(mockActorContext).actorSelection(actorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); - - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - - return actorRef; - } - - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion) { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); - - doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), - eqCreateTransaction(memberName, type)); - - return actorRef; - } - - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { - return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); - } - - - private void propagateReadFailedExceptionCause(CheckedFuture future) - throws Throwable { - - try { - future.checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - throw e.getCause(); - } - } - @Test public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -840,31 +519,6 @@ public class TransactionProxyTest { BatchedModificationsReply.class); } - private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, - Object... expReplies) throws Exception { - assertEquals("getReadyOperationFutures size", expReplies.length, - proxy.getCohortFutures().size()); - - int i = 0; - for( Future future: proxy.getCohortFutures()) { - assertNotNull("Ready operation Future is null", future); - - Object expReply = expReplies[i++]; - if(expReply instanceof ActorSelection) { - ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - assertEquals("Cohort actor path", expReply, actual); - } else { - // Expecting exception. - try { - Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - fail("Expected exception from ready operation Future"); - } catch(Exception e) { - // Expected - } - } - } - } - @Test public void testReady() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -898,74 +552,6 @@ public class TransactionProxyTest { isA(BatchedModifications.class)); } - private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), - READ_WRITE, version); - - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedReadData()); - - doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - - doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); - - doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - - expectReadyTransaction(actorRef); - - doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - - Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). - get(5, TimeUnit.SECONDS); - - assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - assertEquals("Response NormalizedNode", testNode, readOptional.get()); - - transactionProxy.write(TestModel.TEST_PATH, testNode); - - transactionProxy.merge(TestModel.TEST_PATH, testNode); - - transactionProxy.delete(TestModel.TEST_PATH); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class, - ShardTransactionMessages.DeleteDataReply.class); - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - - return actorRef; - } - - @Test - public void testCompatibilityWithBaseHeliumVersion() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); - - verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - - @Test - public void testCompatibilityWithHeliumR1Version() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); - - verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - @Test public void testReadyWithRecordingOperationFailure() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); @@ -1777,49 +1363,4 @@ public class TransactionProxyTest { verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); } - - private List captureBatchedModifications(ActorRef actorRef) { - ArgumentCaptor batchedModificationsCaptor = - ArgumentCaptor.forClass(BatchedModifications.class); - verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( - eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); - - List batchedModifications = filterCaptured( - batchedModificationsCaptor, BatchedModifications.class); - return batchedModifications; - } - - private List filterCaptured(ArgumentCaptor captor, Class type) { - List captured = new ArrayList<>(); - for(T c: captor.getAllValues()) { - if(type.isInstance(c)) { - captured.add(c); - } - } - - return captured; - } - - private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { - List batchedModifications = captureBatchedModifications(actorRef); - assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - - verifyBatchedModifications(batchedModifications.get(0), expected); - } - - private void verifyBatchedModifications(Object message, Modification... expected) { - assertEquals("Message type", BatchedModifications.class, message.getClass()); - BatchedModifications batchedModifications = (BatchedModifications)message; - assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); - for(int i = 0; i < batchedModifications.getModifications().size(); i++) { - Modification actual = batchedModifications.getModifications().get(i); - assertEquals("Modification type", expected[i].getClass(), actual.getClass()); - assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), - ((AbstractModification)actual).getPath()); - if(actual instanceof WriteModification) { - assertEquals("getData", ((WriteModification)expected[i]).getData(), - ((WriteModification)actual).getData()); - } - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java new file mode 100644 index 0000000000..08c32c9a54 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.compat; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import com.google.common.base.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy; +import org.opendaylight.controller.cluster.datastore.TransactionProxy; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +/** + * Unit tests for backwards compatibility with pre-Lithium versions. + * + * @author Thomas Pantelis + */ +public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest { + + private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { + WriteData obj = WriteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { + MergeData obj = MergeData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(expPath); + } + }; + + return argThat(matcher); + } + + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH)); + + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); + + doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); + + doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", testNode, readOptional.get()); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + transactionProxy.merge(TestModel.TEST_PATH, testNode); + + transactionProxy.delete(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + return actorRef; + } + + @Test + public void testCompatibilityWithBaseHeliumVersion() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); + + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + + @Test + public void testCompatibilityWithHeliumR1Version() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); + + verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } +}