Fix intermittent ShardTest failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 83dae880bad3fceef0133fdf692884ca2297f5c7..51063c8d2cc5fe6b2f9e2eb8b4a16d7ed62e30b3 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
@@ -98,7 +97,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -206,6 +204,8 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
+            setupInMemorySnapshotStore();
+
             final MockDataChangeListener listener = new MockDataChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
@@ -214,9 +214,7 @@ public class ShardTest extends AbstractShardTest {
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
-            // Write initial data into the in-memory store.
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             // Wait until the shard receives the first ElectionTimeout message.
             assertEquals("Got first ElectionTimeout", true,
@@ -231,10 +229,10 @@ public class ShardTest extends AbstractShardTest {
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             // Sanity check - verify the shard is not the leader yet.
-            shard.tell(new FindLeader(), getRef());
+            shard.tell(FindLeader.INSTANCE, getRef());
             final FindLeaderReply findLeadeReply =
                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
 
             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
             // with the election process.
@@ -286,8 +284,7 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(shardID).datastoreContext(
-                            dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -311,6 +308,8 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
+            setupInMemorySnapshotStore();
+
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
@@ -320,7 +319,6 @@ public class ShardTest extends AbstractShardTest {
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -330,12 +328,11 @@ public class ShardTest extends AbstractShardTest {
                 RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
-            shard.tell(new FindLeader(), getRef());
+            shard.tell(FindLeader.INSTANCE, getRef());
             final FindLeaderReply findLeadeReply =
                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             onChangeListenerRegistered.countDown();
 
@@ -560,9 +557,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
@@ -575,16 +572,16 @@ public class ShardTest extends AbstractShardTest {
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. After it completes, it should
             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Wait for the next 2 Tx's to complete.
 
@@ -618,7 +615,7 @@ public class ShardTest extends AbstractShardTest {
 
             class OnCommitFutureComplete extends OnFutureComplete {
                 OnCommitFutureComplete() {
-                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CommitTransactionReply.class);
                 }
 
                 @Override
@@ -632,7 +629,7 @@ public class ShardTest extends AbstractShardTest {
                 private final String transactionID;
 
                 OnCanCommitFutureComplete(final String transactionID) {
-                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CanCommitTransactionReply.class);
                     this.transactionID = transactionID;
                 }
 
@@ -643,7 +640,7 @@ public class ShardTest extends AbstractShardTest {
                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
                     final Future<Object> commitFuture = Patterns.ask(shard,
-                            new CommitTransaction(transactionID).toSerializable(), timeout);
+                            new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
                 }
             }
@@ -726,15 +723,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -789,7 +786,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -899,13 +896,13 @@ public class ShardTest extends AbstractShardTest {
 
             // Commit the write transaction.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Verify data in the data store.
 
@@ -1005,7 +1002,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
-            expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1041,7 +1038,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(readyMessage, getRef());
 
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1076,15 +1073,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1123,15 +1120,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1175,13 +1172,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1232,13 +1229,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1299,21 +1296,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1372,21 +1369,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1434,7 +1431,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1447,9 +1444,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1480,9 +1477,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", false, reply.getCanCommit());
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1495,9 +1492,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1543,7 +1540,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1588,7 +1585,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1638,13 +1635,13 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
 
@@ -1710,23 +1707,23 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Try to commit the 1st Tx - should fail as it's not the current Tx.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Commit the 2nd Tx.
 
-            shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
             assertNotNull(listNodePath + " not found", node);
@@ -1780,16 +1777,16 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should get queued.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
 
             // canCommit the 3rd Tx - should exceed queue capacity and fail.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
         }};
     }
@@ -1836,8 +1833,8 @@ public class ShardTest extends AbstractShardTest {
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
             // should expire from the queue and the last one should be processed.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
         }};
     }
 
@@ -1866,8 +1863,8 @@ public class ShardTest extends AbstractShardTest {
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Ready the second Tx.
 
@@ -1893,12 +1890,12 @@ public class ShardTest extends AbstractShardTest {
             // Commit the first Tx. After completing, the second should expire from the queue and the third
             // Tx committed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Expect commit reply from the third Tx.
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
             assertNotNull(TestModel.TEST2_PATH + " not found", node);
@@ -1912,7 +1909,7 @@ public class ShardTest extends AbstractShardTest {
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testCanCommitBeforeReadyFailure");
 
-            shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
         }};
     }
@@ -1955,22 +1952,22 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
             // Tx to proceed.
 
-            shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
 
@@ -2033,8 +2030,8 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the AbortTransaction message.
 
-            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             verify(cohort).abort();
 
@@ -2048,7 +2045,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send CanCommitTransaction - should fail.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
 
             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
             assertTrue("Failure type", failure instanceof IllegalStateException);
@@ -2289,12 +2286,15 @@ public class ShardTest extends AbstractShardTest {
     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
             String testName = "testClusteredDataChangeListenerDelayedRegistration";
-            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
                     actorFactory.generateActorId(testName + "-DataChangeListener"));
 
+            setupInMemorySnapshotStore();
+
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     actorFactory.generateActorId(testName + "-shard"));
@@ -2308,9 +2308,8 @@ public class ShardTest extends AbstractShardTest {
                 RegisterChangeListenerReply.class);
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+            shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                    customRaftPolicyImplementation(null).build(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
         }};
@@ -2363,12 +2362,15 @@ public class ShardTest extends AbstractShardTest {
     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
-            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
+            setupInMemorySnapshotStore();
+
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     actorFactory.generateActorId(testName + "-shard"));
@@ -2382,9 +2384,8 @@ public class ShardTest extends AbstractShardTest {
                     RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+            shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                    customRaftPolicyImplementation(null).build(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
         }};