Refactor LegacyTransactionConntextImpl 32/16032/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 4 Mar 2015 07:53:09 +0000 (02:53 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 8 Mar 2015 02:03:34 +0000 (21:03 -0500)
We have a compat package so the LegacyTransactionConntextImpl really
belongs there to keep it separate from the current classes. Also Legacy
really isn't a good name as it's too generic. The
LegacyTransactionConntextImpl is really specific to  pre-Lithium
compatibility so I renamed it to PreLithiumTransactionContextImpl to
make it clear.

I also refactored the helium compatibility tests in TransactionProxyTest
into a new PreLithiumTransactionProxyTest class to keep them separate.
This will make it easier to remove PreLithiumTransactionContextImpl and
its test code when it's no longer needed. Since it needs the same setup
as TransactionProxyTest and some of the utility methods, I moved all the
setup and utility methods to an AbstractTransactionProxyTest to avoid
duplication. This also makes TransactionProxyTest smaller as it now only
contains the test methods.

Change-Id: I9a81c7a1a07fac9098d497ff301c5d1909094c1b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java with 83% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java [new file with mode: 0644]

index 80aa3793c1a2daf140441152c80a03b2408b8dd6..ec867dda0bb924d7ffbab7ca3ded58f52ad90fb8 100644 (file)
@@ -12,7 +12,7 @@ import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 
-final class OperationCompleter extends OnComplete<Object> {
+public final class OperationCompleter extends OnComplete<Object> {
     private final Semaphore operationLimiter;
 
     OperationCompleter(Semaphore operationLimiter){
index 1e222e4c0a667307ce9b7cf3c24585b7f61d5911..be7169859db56be2851028b2d538c9ce29210c9f 100644 (file)
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-class TransactionContextImpl extends AbstractTransactionContext {
+public class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
     private final ActorContext actorContext;
@@ -49,7 +49,7 @@ class TransactionContextImpl extends AbstractTransactionContext {
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
 
-    TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext,
             boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(identifier);
index 0bc82af3358c4747901fab005db5c19cf2d41288..e5119cf299c4d5e8b10974c25e5f9f02cc12c7f4 100644 (file)
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -731,7 +732,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                     actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
             } else {
-                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
                         actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
             }
         }
@@ -5,9 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.OperationCompleter;
+import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -23,9 +25,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  *
  * @author Thomas Pantelis
  */
-class LegacyTransactionContextImpl extends TransactionContextImpl {
+public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
-    LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
new file mode 100644 (file)
index 0000000..60625a0
--- /dev/null
@@ -0,0 +1,388 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Abstract base class for TransactionProxy unit tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractTransactionProxyTest {
+    private static ActorSystem system;
+
+    private final Configuration configuration = new MockConfiguration();
+
+    @Mock
+    protected ActorContext mockActorContext;
+
+    private SchemaContext schemaContext;
+
+    @Mock
+    private ClusterWrapper mockClusterWrapper;
+
+    protected final String memberName = "mock-member";
+
+    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+            shardBatchedModificationCount(1);
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+
+        Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
+                put("akka.actor.default-dispatcher.type",
+                        "akka.testkit.CallingThreadDispatcherConfigurator").build()).
+                withFallback(ConfigFactory.load());
+        system = ActorSystem.create("test", config);
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+
+        schemaContext = TestModel.createTestContext();
+
+        doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        ShardStrategyFactory.setConfiguration(configuration);
+    }
+
+    protected ActorSystem getSystem() {
+        return system;
+    }
+
+    protected CreateTransaction eqCreateTransaction(final String memberName,
+            final TransactionType type) {
+        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected DataExists eqSerializedDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected DataExists eqDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                return (argument instanceof DataExists) &&
+                    ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected ReadData eqSerializedReadData() {
+        return eqSerializedReadData(TestModel.TEST_PATH);
+    }
+
+    protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(path);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected ReadData eqReadData() {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return (argument instanceof ReadData) &&
+                    ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    protected Future<Object> readySerializedTxReply(String path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
+    }
+
+    protected Future<Object> readyTxReply(String path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path));
+    }
+
+    protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
+    }
+
+    protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    protected Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
+        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
+    }
+
+    protected Future<Object> dataExistsSerializedReply(boolean exists) {
+        return Futures.successful(new DataExistsReply(exists).toSerializable());
+    }
+
+    protected Future<DataExistsReply> dataExistsReply(boolean exists) {
+        return Futures.successful(new DataExistsReply(exists));
+    }
+
+    protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+        return Futures.successful(new BatchedModificationsReply(count));
+    }
+
+    protected Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
+    protected ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
+
+    protected void expectBatchedModifications(ActorRef actorRef, int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+    }
+
+    protected void expectBatchedModifications(int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
+    }
+
+    protected void expectIncompleteBatchedModifications() {
+        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
+    }
+
+    protected void expectReadyTransaction(ActorRef actorRef) {
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    }
+
+    protected void expectFailedBatchedModifications(ActorRef actorRef) {
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+    }
+
+    protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
+        return CreateTransactionReply.newBuilder()
+            .setTransactionActorPath(actorRef.path().toString())
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
+    }
+
+    protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
+        ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        doReturn(actorSystem.actorSelection(actorRef.path())).
+                when(mockActorContext).actorSelection(actorRef.path().toString());
+
+        doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        return actorRef;
+    }
+
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+
+        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
+                        eqCreateTransaction(memberName, type));
+
+        return actorRef;
+    }
+
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
+    }
+
+
+    protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+            throws Throwable {
+
+        try {
+            future.checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            throw e.getCause();
+        }
+    }
+
+    protected List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+                ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+        List<BatchedModifications> batchedModifications = filterCaptured(
+                batchedModificationsCaptor, BatchedModifications.class);
+        return batchedModifications;
+    }
+
+    protected <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+        List<T> captured = new ArrayList<>();
+        for(T c: captor.getAllValues()) {
+            if(type.isInstance(c)) {
+                captured.add(c);
+            }
+        }
+
+        return captured;
+    }
+
+    protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), expected);
+    }
+
+    protected void verifyBatchedModifications(Object message, Modification... expected) {
+        assertEquals("Message type", BatchedModifications.class, message.getClass());
+        BatchedModifications batchedModifications = (BatchedModifications)message;
+        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+            Modification actual = batchedModifications.getModifications().get(i);
+            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+                    ((AbstractModification)actual).getPath());
+            if(actual instanceof WriteModification) {
+                assertEquals("getData", ((WriteModification)expected[i]).getData(),
+                        ((WriteModification)actual).getData());
+            }
+        }
+    }
+
+    protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+            Object... expReplies) throws Exception {
+            assertEquals("getReadyOperationFutures size", expReplies.length,
+                    proxy.getCohortFutures().size());
+
+            int i = 0;
+            for( Future<ActorSelection> future: proxy.getCohortFutures()) {
+                assertNotNull("Ready operation Future is null", future);
+
+                Object expReply = expReplies[i++];
+                if(expReply instanceof ActorSelection) {
+                    ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    assertEquals("Cohort actor path", expReply, actual);
+                } else {
+                    // Expecting exception.
+                    try {
+                        Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                        fail("Expected exception from ready operation Future");
+                    } catch(Exception e) {
+                        // Expected
+                    }
+                }
+            }
+        }
+}
index abfe7eae22a15b69fd4dc1c71df46befe5059e31..8278d3cffceaa6b82357c08c4ab62e33ff53fafe 100644 (file)
@@ -6,11 +6,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -21,78 +19,44 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
-import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
 import org.mockito.InOrder;
-import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
-public class TransactionProxyTest {
+public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @SuppressWarnings("serial")
     static class TestException extends RuntimeException {
@@ -102,291 +66,6 @@ public class TransactionProxyTest {
         CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
     }
 
-    private static ActorSystem system;
-
-    private final Configuration configuration = new MockConfiguration();
-
-    @Mock
-    private ActorContext mockActorContext;
-
-    private SchemaContext schemaContext;
-
-    @Mock
-    private ClusterWrapper mockClusterWrapper;
-
-    private final String memberName = "mock-member";
-
-    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
-            shardBatchedModificationCount(1);
-
-    @BeforeClass
-    public static void setUpClass() throws IOException {
-
-        Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
-                put("akka.actor.default-dispatcher.type",
-                        "akka.testkit.CallingThreadDispatcherConfigurator").build()).
-                withFallback(ConfigFactory.load());
-        system = ActorSystem.create("test", config);
-    }
-
-    @AfterClass
-    public static void tearDownClass() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
-        system = null;
-    }
-
-    @Before
-    public void setUp(){
-        MockitoAnnotations.initMocks(this);
-
-        schemaContext = TestModel.createTestContext();
-
-        doReturn(getSystem()).when(mockActorContext).getActorSystem();
-        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
-        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
-        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
-        ShardStrategyFactory.setConfiguration(configuration);
-    }
-
-    private ActorSystem getSystem() {
-        return system;
-    }
-
-    private CreateTransaction eqCreateTransaction(final String memberName,
-            final TransactionType type) {
-        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                    return obj.getTransactionId().startsWith(memberName) &&
-                            obj.getTransactionType() == type.ordinal();
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DataExists eqSerializedDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DataExists eqDataExists() {
-        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
-            @Override
-            public boolean matches(Object argument) {
-                return (argument instanceof DataExists) &&
-                    ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private ReadData eqSerializedReadData() {
-        return eqSerializedReadData(TestModel.TEST_PATH);
-    }
-
-    private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(path);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private ReadData eqReadData() {
-        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return (argument instanceof ReadData) &&
-                    ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
-                    WriteData obj = WriteData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
-                    MergeData obj = MergeData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
-                }
-
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
-        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
-                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private Future<Object> readySerializedTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
-    }
-
-    private Future<Object> readyTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path));
-    }
-
-    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
-            short transactionVersion) {
-        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
-    }
-
-    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
-    }
-
-    private Future<Object> dataExistsSerializedReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists).toSerializable());
-    }
-
-    private Future<DataExistsReply> dataExistsReply(boolean exists) {
-        return Futures.successful(new DataExistsReply(exists));
-    }
-
-    private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
-        return Futures.successful(new BatchedModificationsReply(count));
-    }
-
-    private Future<Object> incompleteFuture(){
-        return mock(Future.class);
-    }
-
-    private ActorSelection actorSelection(ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
-    }
-
-    private void expectBatchedModifications(ActorRef actorRef, int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
-    }
-
-    private void expectBatchedModifications(int count) {
-        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
-    }
-
-    private void expectIncompleteBatchedModifications() {
-        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
-    }
-
-    private void expectReadyTransaction(ActorRef actorRef) {
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-    }
-
-    private void expectFailedBatchedModifications(ActorRef actorRef) {
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
-    }
-
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
-        return CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1")
-            .setMessageVersion(transactionVersion)
-            .build();
-    }
-
-    private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
-        ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-        doReturn(actorSystem.actorSelection(actorRef.path())).
-                when(mockActorContext).actorSelection(actorRef.path().toString());
-
-        doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
-                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
-        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
-        return actorRef;
-    }
-
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion) {
-        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
-
-        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
-                        eqCreateTransaction(memberName, type));
-
-        return actorRef;
-    }
-
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
-    }
-
-
-    private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
-            throws Throwable {
-
-        try {
-            future.checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            throw e.getCause();
-        }
-    }
-
     @Test
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
@@ -840,31 +519,6 @@ public class TransactionProxyTest {
                 BatchedModificationsReply.class);
     }
 
-    private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
-        Object... expReplies) throws Exception {
-        assertEquals("getReadyOperationFutures size", expReplies.length,
-                proxy.getCohortFutures().size());
-
-        int i = 0;
-        for( Future<ActorSelection> future: proxy.getCohortFutures()) {
-            assertNotNull("Ready operation Future is null", future);
-
-            Object expReply = expReplies[i++];
-            if(expReply instanceof ActorSelection) {
-                ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                assertEquals("Cohort actor path", expReply, actual);
-            } else {
-                // Expecting exception.
-                try {
-                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    fail("Expected exception from ready operation Future");
-                } catch(Exception e) {
-                    // Expected
-                }
-            }
-        }
-    }
-
     @Test
     public void testReady() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
@@ -898,74 +552,6 @@ public class TransactionProxyTest {
                 isA(BatchedModifications.class));
     }
 
-    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
-                READ_WRITE, version);
-
-        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
-
-        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
-
-        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
-
-        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
-
-        expectReadyTransaction(actorRef);
-
-        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
-                get(5, TimeUnit.SECONDS);
-
-        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-        assertEquals("Response NormalizedNode", testNode, readOptional.get());
-
-        transactionProxy.write(TestModel.TEST_PATH, testNode);
-
-        transactionProxy.merge(TestModel.TEST_PATH, testNode);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
-                ShardTransactionMessages.DeleteDataReply.class);
-
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
-
-        return actorRef;
-    }
-
-    @Test
-    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
-        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
-
-        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-    }
-
-    @Test
-    public void testCompatibilityWithHeliumR1Version() throws Exception {
-        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
-
-        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-    }
-
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -1777,49 +1363,4 @@ public class TransactionProxyTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
-
-    private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
-        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
-                ArgumentCaptor.forClass(BatchedModifications.class);
-        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
-                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
-
-        List<BatchedModifications> batchedModifications = filterCaptured(
-                batchedModificationsCaptor, BatchedModifications.class);
-        return batchedModifications;
-    }
-
-    private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
-        List<T> captured = new ArrayList<>();
-        for(T c: captor.getAllValues()) {
-            if(type.isInstance(c)) {
-                captured.add(c);
-            }
-        }
-
-        return captured;
-    }
-
-    private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
-        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
-        verifyBatchedModifications(batchedModifications.get(0), expected);
-    }
-
-    private void verifyBatchedModifications(Object message, Modification... expected) {
-        assertEquals("Message type", BatchedModifications.class, message.getClass());
-        BatchedModifications batchedModifications = (BatchedModifications)message;
-        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
-        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
-            Modification actual = batchedModifications.getModifications().get(i);
-            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
-            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
-                    ((AbstractModification)actual).getPath());
-            if(actual instanceof WriteModification) {
-                assertEquals("getData", ((WriteModification)expected[i]).getData(),
-                        ((WriteModification)actual).getData());
-            }
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
new file mode 100644 (file)
index 0000000..08c32c9
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+/**
+ * Unit tests for backwards compatibility with pre-Lithium versions.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest {
+
+    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+            @Override
+            public boolean matches(Object argument) {
+                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
+                }
+
+                return false;
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
+        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH));
+
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
+
+        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
+        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+}