Merge "BUG 2773 : Transition Shard to Leader state when it has no peers"
authorTom Pantelis <tpanteli@brocade.com>
Fri, 27 Mar 2015 12:52:22 +0000 (12:52 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Mar 2015 12:52:23 +0000 (12:52 +0000)
1  2 
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 2a3653ee91d1bc4ba30dca5a1f229412aba50534,45671ea31e4c804f9993df96e3d534ceaf6e4247..c276d32cce33d5b5bfada40f7f62afb6244a2e07
@@@ -39,6 -39,8 +39,8 @@@ import scala.concurrent.duration.Finite
   */
  public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
  
+     protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
      /**
       * Information about the RaftActor whose behavior this class represents
       */
          // message is sent to itself
          electionCancel =
              context.getActorSystem().scheduler().scheduleOnce(interval,
-                 context.getActor(), new ElectionTimeout(),
+                 context.getActor(), ELECTION_TIMEOUT,
                  context.getActorSystem().dispatcher(), context.getActor());
      }
  
       * @param snapshotCapturedIndex
       */
      protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
 -        //  we would want to keep the lastApplied as its used while capturing snapshots
 -        long lastApplied = context.getLastApplied();
 -        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 +        long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
  
 -        if(LOG.isTraceEnabled()) {
 -            LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
 -                    logName, snapshotCapturedIndex, lastApplied, tempMin);
 -        }
 -
 -        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
 -            LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
 -                    context.getTermInformation().getCurrentTerm());
 -
 -            //use the term of the temp-min, since we check for isPresent, entry will not be null
 -            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
 -            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
 -            context.getReplicatedLog().snapshotCommit();
 -            setReplicatedToAllIndex(tempMin);
 -        } else if(tempMin > getReplicatedToAllIndex()) {
 -            // It's possible a follower was lagging and an install snapshot advanced its match index past
 -            // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
 -            // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
 -            // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
 -            // trim the log to the last applied index even if previous entries weren't replicated to all followers.
 -            setReplicatedToAllIndex(tempMin);
 +        if(actualIndex != -1){
 +            setReplicatedToAllIndex(actualIndex);
          }
      }
  
index 6a29a348b8420aa0e77fd9c93f3ada12b4578fa4,a1174d70dcfd40d4272f3ef664bc8ed3c78d6868..a6722e6ff98dbbe9ab68df6c9e04915c23c8721a
@@@ -46,9 -46,14 +46,14 @@@ public class Follower extends AbstractR
      public Follower(RaftActorContext context) {
          super(context, RaftState.Follower);
  
-         scheduleElection(electionDuration());
          initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+         if(context.getPeerAddresses().isEmpty()){
+             actor().tell(ELECTION_TIMEOUT, actor());
+         } else {
+             scheduleElection(electionDuration());
+         }
      }
  
      private boolean isLogEntryPresent(long index){
  
          sender.tell(reply, actor());
  
 -        if (!context.isSnapshotCaptureInitiated()) {
 +        if (!context.getSnapshotManager().isCapturing()) {
              super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
          }
  
index b93b73958baede1e6e7e51560bed0386d4587a65,a3b070e2cc4e91a60b0d1a073010b3ed15955289..0a4a2c7717facfcc9fc883c2234d4577ff49f876
@@@ -31,6 -31,7 +31,7 @@@ import akka.testkit.JavaTestKit
  import akka.testkit.TestActorRef;
  import akka.util.Timeout;
  import com.google.common.base.Optional;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Lists;
  import com.google.common.util.concurrent.Uninterruptibles;
  import com.google.protobuf.ByteString;
@@@ -60,6 -61,7 +61,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
  import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
  import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
  import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
  import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
  import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@@ -158,6 -160,16 +159,16 @@@ public class RaftActorTest extends Abst
              }
          }
  
+         public void waitUntilLeader(){
+             for(int i = 0;i < 10; i++){
+                 if(isLeader()){
+                     break;
+                 }
+                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+             }
+         }
          public List<Object> getState() {
              return state;
          }
              return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
          }
  
+         public static Props props(final String id, final Map<String, String> peerAddresses,
+                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+                                   DataPersistenceProvider dataPersistenceProvider){
+             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+         }
          @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
              delegate.applyState(clientActor, identifier, data);
              LOG.info("{}: applyState called", persistenceId());
  
                  mockRaftActor.waitForInitializeBehaviorComplete();
  
+                 mockRaftActor.waitUntilLeader();
                  mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
  
                  mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
  
-                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
              }
          };
      }
  
                  mockRaftActor.waitForInitializeBehaviorComplete();
  
+                 mockRaftActor.waitUntilLeader();
                  mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
  
-                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
+                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
  
              }
  
                          new MockRaftActorContext.MockPayload("C"),
                          new MockRaftActorContext.MockPayload("D")));
  
 -                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
 -
                  RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
  
 +                raftActorContext.getSnapshotManager().capture(
 +                        new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
 +                                new MockRaftActorContext.MockPayload("D")), -1);
 +
                  mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
  
                  mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                  DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
  
                  TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
  
                  MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
  
                  mockRaftActor.waitForInitializeBehaviorComplete();
 +                MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
  
                  mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
                  mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
                  mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
                  mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
 -                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
 +                mockRaftActor.getReplicatedLog().append(lastEntry);
  
                  ByteString snapshotBytes = fromObject(Arrays.asList(
                          new MockRaftActorContext.MockPayload("A"),
                  mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
  
                  long replicatedToAllIndex = 1;
 -                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
 +
 +                mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
  
                  verify(mockRaftActor.delegate).createSnapshot();
  
  
                  mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
  
 -                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
 +                raftActorContext.getSnapshotManager().capture(
 +                        new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
 +                                new MockRaftActorContext.MockPayload("D")), 1);
  
                  mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
  
      }
  
      @Test
-     public void testRaftRoleChangeNotifier() throws Exception {
+     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
          new JavaTestKit(getSystem()) {{
              TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
                      Props.create(MessageCollectorActor.class));
              DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
              long heartBeatInterval = 100;
              config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
-             config.setElectionTimeoutFactor(1);
+             config.setElectionTimeoutFactor(20);
  
              String persistenceId = factory.generateActorId("notifier-");
  
              TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
+                     new NonPersistentProvider()), persistenceId);
  
              List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
  
              // check if the notifier got a role change from null to Follower
              RoleChanged raftRoleChanged = matches.get(0);
              assertEquals(persistenceId, raftRoleChanged.getMemberId());
          }};
      }
  
+     @Test
+     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
+         new JavaTestKit(getSystem()) {{
+             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+             MessageCollectorActor.waitUntilReady(notifierActor);
+             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+             long heartBeatInterval = 100;
+             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+             config.setElectionTimeoutFactor(1);
+             String persistenceId = factory.generateActorId("notifier-");
+             factory.createActor(MockRaftActor.props(persistenceId,
+                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+             List<RoleChanged> matches =  null;
+             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+                 assertNotNull(matches);
+                 if(matches.size() == 3) {
+                     break;
+                 }
+                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+             }
+             assertEquals(2, matches.size());
+             // check if the notifier got a role change from null to Follower
+             RoleChanged raftRoleChanged = matches.get(0);
+             assertEquals(persistenceId, raftRoleChanged.getMemberId());
+             assertNull(raftRoleChanged.getOldRole());
+             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+             // check if the notifier got a role change from Follower to Candidate
+             raftRoleChanged = matches.get(1);
+             assertEquals(persistenceId, raftRoleChanged.getMemberId());
+             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+         }};
+     }
      @Test
      public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
          new JavaTestKit(getSystem()) {
  
                  assertEquals(8, leaderActor.getReplicatedLog().size());
  
 -                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
 +                leaderActor.getRaftActorContext().getSnapshotManager()
 +                        .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
 +                                new MockRaftActorContext.MockPayload("x")), 4);
  
 -                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                  verify(leaderActor.delegate).createSnapshot();
  
                  assertEquals(8, leaderActor.getReplicatedLog().size());
                          new MockRaftActorContext.MockPayload("foo-2"),
                          new MockRaftActorContext.MockPayload("foo-3"),
                          new MockRaftActorContext.MockPayload("foo-4")));
 -                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 -                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 +
 +                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
 +                        , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
 +
 +                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 +
 +                // The commit is needed to complete the snapshot creation process
 +                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
  
                  // capture snapshot reply should remove the snapshotted entries only
                  assertEquals(3, leaderActor.getReplicatedLog().size());
                  assertEquals(6, followerActor.getReplicatedLog().size());
  
                  //snapshot on 4
 -                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
 +                followerActor.getRaftActorContext().getSnapshotManager().capture(
 +                        new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
 +                                new MockRaftActorContext.MockPayload("D")), 4);
  
 -                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                  verify(followerActor.delegate).createSnapshot();
  
                  assertEquals(6, followerActor.getReplicatedLog().size());
                          new MockRaftActorContext.MockPayload("foo-3"),
                          new MockRaftActorContext.MockPayload("foo-4")));
                  followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 -                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
 +                assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 +
 +                // The commit is needed to complete the snapshot creation process
 +                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
  
                  // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                  assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
                          new MockRaftActorContext.MockPayload("foo-3"),
                          new MockRaftActorContext.MockPayload("foo-4")));
                  leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 -                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 +                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
  
                  assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
  
  
              // Trimming log in this scenario is a no-op
              assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
 -            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 +            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
              assertEquals(-1, leader.getReplicatedToAllIndex());
  
          }};
  
              // Trimming log in this scenario is a no-op
              assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
 -            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 +            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
              assertEquals(3, leader.getReplicatedToAllIndex());
  
          }};
index 1dda2791c063d48966ec778e98bc462c20521e0f,095e756409ab52aa98f0033d307f80a0bcdf0cd7..ba0bd0f29c96237ab758487b719eac959d115edc
@@@ -48,6 -48,7 +48,7 @@@ import scala.concurrent.duration.Finite
  public class LeaderTest extends AbstractLeaderTest {
  
      static final String FOLLOWER_ID = "follower";
+     public static final String LEADER_ID = "leader";
  
      private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
              Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
                  new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                          new MockRaftActorContext.MockPayload("D"));
  
 +        actorContext.getReplicatedLog().append(entry);
 +
          //update follower timestamp
          leader.markFollowerActive(FOLLOWER_ID);
  
  
      @Override
      protected MockRaftActorContext createActorContext(ActorRef actorRef) {
-         return createActorContext("leader", actorRef);
+         return createActorContext(LEADER_ID, actorRef);
      }
  
      private MockRaftActorContext createActorContextWithFollower() {
          MockRaftActorContext leaderActorContext = createActorContext();
  
          MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
  
          Follower follower = new Follower(followerActorContext);
          followerActor.underlyingActor().setBehavior(follower);
  
-         Map<String, String> peerAddresses = new HashMap<>();
-         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+         Map<String, String> leaderPeerAddresses = new HashMap<>();
+         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
  
-         leaderActorContext.setPeerAddresses(peerAddresses);
+         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
  
          leaderActorContext.getReplicatedLog().removeFrom(0);
  
          MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
  
          followerActorContext.setConfigParams(configParams);
+         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
  
          Follower follower = new Follower(followerActorContext);
          followerActor.underlyingActor().setBehavior(follower);
index efcb0c9bfd6da4ae3cf0dce5681d3931eaa1271e,78a451953bc955a73b0853bb2ec27faec8440ea3..cc96d0d3b0d070623c737dc8f78340c45a20539f
@@@ -41,7 -41,6 +41,7 @@@ import java.util.concurrent.atomic.Atom
  import org.junit.Test;
  import org.mockito.InOrder;
  import org.opendaylight.controller.cluster.DataPersistenceProvider;
 +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@@ -70,13 -69,13 +70,13 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 +import org.opendaylight.controller.cluster.raft.RaftActorContext;
  import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
  import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
  import org.opendaylight.controller.cluster.raft.Snapshot;
  import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
  import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
  import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
  import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
  import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
  import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@@ -100,7 -99,6 +100,7 @@@ import org.opendaylight.yangtools.yang.
  import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
  import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 +import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  import scala.concurrent.Await;
  import scala.concurrent.Future;
  import scala.concurrent.duration.FiniteDuration;
@@@ -160,8 -158,12 +160,12 @@@ public class ShardTest extends Abstract
  
                  @Override
                  public Shard create() throws Exception {
+                     // Use a non persistent provider because this test actually invokes persist on the journal
+                     // this will cause all other messages to not be queued properly after that.
+                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                     // it does do a persist)
                      return new Shard(shardID, Collections.<String,String>emptyMap(),
-                             newDatastoreContext(), SCHEMA_CONTEXT) {
+                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                          @Override
                          public void onReceiveCommand(final Object message) throws Exception {
                              if(message instanceof ElectionTimeout && firstElectionTimeout) {
  
                  // Use MBean for verification
                  // Committed transaction count should increase as usual
-                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
  
                  // Commit index should advance as we do not have an empty modification
                  assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
  
          dataStoreContextBuilder.persistent(persistent);
  
 +
 +
          new ShardTestKit(getSystem()) {{
              final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
 -            Creator<Shard> creator = new Creator<Shard>() {
 -                @Override
 -                public Shard create() throws Exception {
 -                    return new Shard(shardID, Collections.<String,String>emptyMap(),
 -                            newDatastoreContext(), SCHEMA_CONTEXT) {
  
 -                        DelegatingPersistentDataProvider delegating;
 +            class TestShard extends Shard {
  
 -                        @Override
 -                        protected DataPersistenceProvider persistence() {
 -                            if(delegating == null) {
 -                                delegating = new DelegatingPersistentDataProvider(super.persistence());
 -                            }
 +                protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
 +                                    DatastoreContext datastoreContext, SchemaContext schemaContext) {
 +                    super(name, peerAddresses, datastoreContext, schemaContext);
 +                }
  
 -                            return delegating;
 -                        }
 +                DelegatingPersistentDataProvider delegating;
  
 -                        @Override
 -                        protected void commitSnapshot(final long sequenceNumber) {
 -                            super.commitSnapshot(sequenceNumber);
 -                            latch.get().countDown();
 -                        }
 -                    };
 +                protected DataPersistenceProvider persistence() {
 +                    if(delegating == null) {
 +                        delegating = new DelegatingPersistentDataProvider(super.persistence());
 +                    }
 +                    return delegating;
 +                }
 +
 +                @Override
 +                protected void commitSnapshot(final long sequenceNumber) {
 +                    super.commitSnapshot(sequenceNumber);
 +                    latch.get().countDown();
 +                }
 +
 +                @Override
 +                public RaftActorContext getRaftActorContext() {
 +                    return super.getRaftActorContext();
 +                }
 +            }
 +
 +            Creator<Shard> creator = new Creator<Shard>() {
 +                @Override
 +                public Shard create() throws Exception {
 +                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
 +                            newDatastoreContext(), SCHEMA_CONTEXT);
                  }
              };
  
  
              NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
  
 -            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
 -            shard.tell(capture, getRef());
 +            // Trigger creation of a snapshot by ensuring
 +            RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
 +            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
  
              assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
  
              latch.set(new CountDownLatch(1));
              savedSnapshot.set(null);
  
 -            shard.tell(capture, getRef());
 +            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
  
              assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));