Fix raw reference
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 23a200f6bcda05b11afcd24b248ed8145f400122..c31acdad9361c2dd07053da257bbcf66eb11e3a5 100644 (file)
@@ -58,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -97,9 +98,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -119,8 +117,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
-    private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
-
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
 
     @Test
@@ -356,13 +352,13 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shard.tell(new CreateTransaction("txn-1",
-                    TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+            shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+                    DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
                     CreateTransactionReply.class);
 
-            final String path = reply.getTransactionActorPath().toString();
+            final String path = reply.getTransactionPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
         }};
@@ -375,14 +371,13 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            shard.tell(new CreateTransaction("txn-1",
-                    TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
-                    getRef());
+            shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
+                    DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
                     CreateTransactionReply.class);
 
-            final String path = reply.getTransactionActorPath().toString();
+            final String path = reply.getTransactionPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
         }};
@@ -564,9 +559,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
@@ -579,16 +574,16 @@ public class ShardTest extends AbstractShardTest {
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. After it completes, it should
             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Wait for the next 2 Tx's to complete.
 
@@ -622,7 +617,7 @@ public class ShardTest extends AbstractShardTest {
 
             class OnCommitFutureComplete extends OnFutureComplete {
                 OnCommitFutureComplete() {
-                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CommitTransactionReply.class);
                 }
 
                 @Override
@@ -636,7 +631,7 @@ public class ShardTest extends AbstractShardTest {
                 private final String transactionID;
 
                 OnCanCommitFutureComplete(final String transactionID) {
-                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CanCommitTransactionReply.class);
                     this.transactionID = transactionID;
                 }
 
@@ -647,7 +642,7 @@ public class ShardTest extends AbstractShardTest {
                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
                     final Future<Object> commitFuture = Patterns.ask(shard,
-                            new CommitTransaction(transactionID).toSerializable(), timeout);
+                            new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
                 }
             }
@@ -730,15 +725,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -793,7 +788,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -891,24 +886,25 @@ public class ShardTest extends AbstractShardTest {
 
             // Create a read Tx on the same chain.
 
-            shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
-                    transactionChainID).toSerializable(), getRef());
+            shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
+                    transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
-            getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+            getSystem().actorSelection(createReply.getTransactionPath()).tell(
+                    new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
             // Commit the write transaction.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Verify data in the data store.
 
@@ -1008,7 +1004,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
-            expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1044,7 +1040,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(readyMessage, getRef());
 
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1079,15 +1075,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1126,15 +1122,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1178,13 +1174,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1235,13 +1231,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1302,21 +1298,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1375,21 +1371,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1437,7 +1433,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1450,9 +1446,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1483,9 +1479,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", false, reply.getCanCommit());
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1498,9 +1494,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1546,7 +1542,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1591,7 +1587,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1641,13 +1637,13 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
 
@@ -1713,23 +1709,23 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Try to commit the 1st Tx - should fail as it's not the current Tx.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Commit the 2nd Tx.
 
-            shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
             assertNotNull(listNodePath + " not found", node);
@@ -1783,16 +1779,16 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should get queued.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
 
             // canCommit the 3rd Tx - should exceed queue capacity and fail.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
         }};
     }
@@ -1839,8 +1835,8 @@ public class ShardTest extends AbstractShardTest {
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
             // should expire from the queue and the last one should be processed.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
         }};
     }
 
@@ -1869,8 +1865,8 @@ public class ShardTest extends AbstractShardTest {
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Ready the second Tx.
 
@@ -1896,12 +1892,12 @@ public class ShardTest extends AbstractShardTest {
             // Commit the first Tx. After completing, the second should expire from the queue and the third
             // Tx committed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Expect commit reply from the third Tx.
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
             assertNotNull(TestModel.TEST2_PATH + " not found", node);
@@ -1915,7 +1911,7 @@ public class ShardTest extends AbstractShardTest {
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testCanCommitBeforeReadyFailure");
 
-            shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
         }};
     }
@@ -1958,22 +1954,22 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
             // Tx to proceed.
 
-            shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
 
@@ -2036,8 +2032,8 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the AbortTransaction message.
 
-            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             verify(cohort).abort();
 
@@ -2051,7 +2047,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send CanCommitTransaction - should fail.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
 
             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
             assertTrue("Failure type", failure instanceof IllegalStateException);