BUG-5626: do not allow overriding of RaftActor.handleCommand()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index f832d2acc1c114fc5cc1371d919bd5fcc6abf208..3e2313dc2780eab81c8a6c355751f8def2589905 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
@@ -98,7 +97,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -176,7 +174,7 @@ public class ShardTest extends AbstractShardTest {
                     // it does do a persist)
                     return new Shard(newShardBuilder()) {
                         @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
+                        public void handleCommand(final Object message) {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
                                 // Got the first ElectionTimeout. We don't forward it to the
                                 // base Shard yet until we've sent the RegisterChangeListener
@@ -199,13 +197,15 @@ public class ShardTest extends AbstractShardTest {
 
                                 onFirstElectionTimeout.countDown();
                             } else {
-                                super.onReceiveCommand(message);
+                                super.handleCommand(message);
                             }
                         }
                     };
                 }
             };
 
+            setupInMemorySnapshotStore();
+
             final MockDataChangeListener listener = new MockDataChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
@@ -214,9 +214,7 @@ public class ShardTest extends AbstractShardTest {
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
-            // Write initial data into the in-memory store.
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             // Wait until the shard receives the first ElectionTimeout message.
             assertEquals("Got first ElectionTimeout", true,
@@ -231,10 +229,10 @@ public class ShardTest extends AbstractShardTest {
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             // Sanity check - verify the shard is not the leader yet.
-            shard.tell(new FindLeader(), getRef());
+            shard.tell(FindLeader.INSTANCE, getRef());
             final FindLeaderReply findLeadeReply =
                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
 
             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
             // with the election process.
@@ -286,10 +284,9 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(shardID).datastoreContext(
-                            dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
+                    return new Shard(newShardBuilder()) {
                         @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
+                        public void handleCommand(final Object message) {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
                                 firstElectionTimeout = false;
                                 final ActorRef self = getSelf();
@@ -304,13 +301,15 @@ public class ShardTest extends AbstractShardTest {
 
                                 onFirstElectionTimeout.countDown();
                             } else {
-                                super.onReceiveCommand(message);
+                                super.handleCommand(message);
                             }
                         }
                     };
                 }
             };
 
+            setupInMemorySnapshotStore();
+
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
@@ -320,7 +319,6 @@ public class ShardTest extends AbstractShardTest {
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -330,12 +328,11 @@ public class ShardTest extends AbstractShardTest {
                 RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
-            shard.tell(new FindLeader(), getRef());
+            shard.tell(FindLeader.INSTANCE, getRef());
             final FindLeaderReply findLeadeReply =
                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             onChangeListenerRegistered.countDown();
 
@@ -560,9 +557,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
@@ -575,16 +572,16 @@ public class ShardTest extends AbstractShardTest {
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. After it completes, it should
             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Wait for the next 2 Tx's to complete.
 
@@ -618,7 +615,7 @@ public class ShardTest extends AbstractShardTest {
 
             class OnCommitFutureComplete extends OnFutureComplete {
                 OnCommitFutureComplete() {
-                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CommitTransactionReply.class);
                 }
 
                 @Override
@@ -632,7 +629,7 @@ public class ShardTest extends AbstractShardTest {
                 private final String transactionID;
 
                 OnCanCommitFutureComplete(final String transactionID) {
-                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+                    super(CanCommitTransactionReply.class);
                     this.transactionID = transactionID;
                 }
 
@@ -643,7 +640,7 @@ public class ShardTest extends AbstractShardTest {
                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
                     final Future<Object> commitFuture = Patterns.ask(shard,
-                            new CommitTransaction(transactionID).toSerializable(), timeout);
+                            new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
                 }
             }
@@ -726,15 +723,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -789,7 +786,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(mockCohort.get());
             inOrder.verify(mockCohort.get()).canCommit();
@@ -892,19 +889,20 @@ public class ShardTest extends AbstractShardTest {
 
             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 
-            getSystem().actorSelection(createReply.getTransactionPath()).tell(new ReadData(path), getRef());
+            getSystem().actorSelection(createReply.getTransactionPath()).tell(
+                    new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 
             // Commit the write transaction.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Verify data in the data store.
 
@@ -979,8 +977,12 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testReadyWithImmediateCommit() throws Exception{
+    public void testReadyWithReadWriteImmediateCommit() throws Exception{
         testReadyWithImmediateCommit(true);
+    }
+
+    @Test
+    public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
         testReadyWithImmediateCommit(false);
     }
 
@@ -1004,7 +1006,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
-            expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1040,7 +1042,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(readyMessage, getRef());
 
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1075,15 +1077,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
-            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
@@ -1091,9 +1093,13 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWithPersistenceDisabled() throws Throwable {
+    public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
+        testCommitWithPersistenceDisabled(true);
+    }
+
+    @Test
+    public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
         testCommitWithPersistenceDisabled(true);
-        testCommitWithPersistenceDisabled(false);
     }
 
     private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
@@ -1122,15 +1128,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
@@ -1143,8 +1149,12 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasNoModifications() {
+    public void testReadWriteCommitWhenTransactionHasNoModifications() {
         testCommitWhenTransactionHasNoModifications(true);
+    }
+
+    @Test
+    public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
         testCommitWhenTransactionHasNoModifications(false);
     }
 
@@ -1174,13 +1184,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1201,8 +1211,12 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasModifications() {
+    public void testReadWriteCommitWhenTransactionHasModifications() {
         testCommitWhenTransactionHasModifications(true);
+    }
+
+    @Test
+    public void testWriteOnlyCommitWhenTransactionHasModifications() {
         testCommitWhenTransactionHasModifications(false);
     }
 
@@ -1231,13 +1245,13 @@ public class ShardTest extends AbstractShardTest {
 
                 // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                        expectMsgClass(duration, CanCommitTransactionReply.class));
                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+                expectMsgClass(duration, CommitTransactionReply.class);
 
                 final InOrder inOrder = inOrder(cohort);
                 inOrder.verify(cohort).canCommit();
@@ -1279,7 +1293,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+            doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
 
             final String transactionID2 = "tx2";
@@ -1298,21 +1312,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1371,21 +1385,21 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the CommitTransaction message for the first Tx. This should send back an error
             // and trigger the 2nd Tx to proceed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
@@ -1433,7 +1447,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1446,9 +1460,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1479,9 +1493,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", false, reply.getCanCommit());
 
             // Send another can commit to ensure the failed one got cleaned up.
@@ -1494,9 +1508,9 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
             reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(CanCommitTransactionReply.class));
             assertEquals("getCanCommit", true, reply.getCanCommit());
         }};
     }
@@ -1542,7 +1556,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1587,7 +1601,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
@@ -1637,13 +1651,13 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
 
@@ -1709,23 +1723,23 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Try to commit the 1st Tx - should fail as it's not the current Tx.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Commit the 2nd Tx.
 
-            shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
             assertNotNull(listNodePath + " not found", node);
@@ -1779,16 +1793,16 @@ public class ShardTest extends AbstractShardTest {
 
             // canCommit 1st Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // canCommit the 2nd Tx - it should get queued.
 
-            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
 
             // canCommit the 3rd Tx - should exceed queue capacity and fail.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
         }};
     }
@@ -1835,8 +1849,8 @@ public class ShardTest extends AbstractShardTest {
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
             // should expire from the queue and the last one should be processed.
 
-            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
         }};
     }
 
@@ -1865,8 +1879,8 @@ public class ShardTest extends AbstractShardTest {
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             // Ready the second Tx.
 
@@ -1892,12 +1906,12 @@ public class ShardTest extends AbstractShardTest {
             // Commit the first Tx. After completing, the second should expire from the queue and the third
             // Tx committed.
 
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             // Expect commit reply from the third Tx.
 
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+            expectMsgClass(duration, CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
             assertNotNull(TestModel.TEST2_PATH + " not found", node);
@@ -1911,7 +1925,7 @@ public class ShardTest extends AbstractShardTest {
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testCanCommitBeforeReadyFailure");
 
-            shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
         }};
     }
@@ -1954,22 +1968,22 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the CanCommitTransaction message for the first Tx.
 
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
             // processed after the first Tx completes.
 
             final Future<Object> canCommitFuture = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+                    new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
 
             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
             // Tx to proceed.
 
-            shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             // Wait for the 2nd Tx to complete the canCommit phase.
 
@@ -1997,9 +2011,9 @@ public class ShardTest extends AbstractShardTest {
                 public Shard create() throws Exception {
                     return new Shard(newShardBuilder()) {
                         @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
-                            super.onReceiveCommand(message);
-                            if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                        public void handleCommand(final Object message) {
+                            super.handleCommand(message);
+                            if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
                                 if(cleaupCheckLatch.get() != null) {
                                     cleaupCheckLatch.get().countDown();
                                 }
@@ -2032,8 +2046,8 @@ public class ShardTest extends AbstractShardTest {
 
             // Send the AbortTransaction message.
 
-            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
-            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
 
             verify(cohort).abort();
 
@@ -2047,7 +2061,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send CanCommitTransaction - should fail.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
 
             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
             assertTrue("Failure type", failure instanceof IllegalStateException);
@@ -2096,7 +2110,8 @@ public class ShardTest extends AbstractShardTest {
                 public void handleCommand(final Object message) {
                     super.handleCommand(message);
 
-                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                    // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
+                    if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
                         latch.get().countDown();
                     }
                 }
@@ -2275,11 +2290,11 @@ public class ShardTest extends AbstractShardTest {
                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testFollowerInitialSyncStatus");
 
-        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+        shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
 
         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
 
-        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+        shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
 
         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
     }
@@ -2288,12 +2303,15 @@ public class ShardTest extends AbstractShardTest {
     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
             String testName = "testClusteredDataChangeListenerDelayedRegistration";
-            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
                     actorFactory.generateActorId(testName + "-DataChangeListener"));
 
+            setupInMemorySnapshotStore();
+
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     actorFactory.generateActorId(testName + "-shard"));
@@ -2307,9 +2325,8 @@ public class ShardTest extends AbstractShardTest {
                 RegisterChangeListenerReply.class);
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+            shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                    customRaftPolicyImplementation(null).build(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
         }};
@@ -2338,7 +2355,7 @@ public class ShardTest extends AbstractShardTest {
                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
-            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
             String leaderPath = waitUntilLeader(followerShard);
             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
@@ -2362,12 +2379,15 @@ public class ShardTest extends AbstractShardTest {
     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
-            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
+            setupInMemorySnapshotStore();
+
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     actorFactory.generateActorId(testName + "-shard"));
@@ -2381,9 +2401,8 @@ public class ShardTest extends AbstractShardTest {
                     RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
-            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+            shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                    customRaftPolicyImplementation(null).build(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
         }};
@@ -2412,7 +2431,7 @@ public class ShardTest extends AbstractShardTest {
                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
-            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
             String leaderPath = waitUntilLeader(followerShard);
             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);