Bug 3570: Persist snapshot on follower ApplySnapshot 50/22250/2
authorTom Pantelis <tpanteli@brocade.com>
Tue, 9 Jun 2015 13:32:52 +0000 (09:32 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 10 Jun 2015 04:44:17 +0000 (04:44 +0000)
When a leader installs a snapshot on a follower, the follower now
perists the snapshot.

Change-Id: I56e25aa80f335e41a992ddce084c84c2a345b03b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 86f58c88f0e6a0d3caf930f1ac59ab617e034894)

12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java

index cab76e90ce69756f084668bd3f695c12a9a71012..2db595d743b727fdde09a66d57f8791c0545ac2f 100644 (file)
@@ -30,11 +30,18 @@ class RaftActorSnapshotMessageSupport {
 
     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
         @Override
-        public void apply(Void notUsed) throws Exception {
+        public void apply(Void notUsed) {
             cohort.createSnapshot(context.getActor());
         }
     };
 
+    private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
+        @Override
+        public void apply(byte[] state) {
+            cohort.applySnapshot(state);
+        }
+    };
+
     RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
             RaftActorSnapshotCohort cohort) {
         this.context = context;
@@ -43,6 +50,7 @@ class RaftActorSnapshotMessageSupport {
         this.log = context.getLogger();
 
         context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
+        context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
     }
 
     boolean handleSnapshotMessage(Object message) {
@@ -59,7 +67,7 @@ class RaftActorSnapshotMessageSupport {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
             return true;
         } else if (message.equals(COMMIT_SNAPSHOT)) {
-            context.getSnapshotManager().commit(-1);
+            context.getSnapshotManager().commit(-1, currentBehavior);
             return true;
         } else {
             return false;
@@ -84,20 +92,13 @@ class RaftActorSnapshotMessageSupport {
 
         long sequenceNumber = success.metadata().sequenceNr();
 
-        context.getSnapshotManager().commit(sequenceNumber);
+        context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
     }
 
     private void onApplySnapshot(Snapshot snapshot) {
-        if(log.isDebugEnabled()) {
-            log.debug("{}: ApplySnapshot called on Follower Actor " +
-                    "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
-                snapshot.getLastAppliedTerm());
-        }
-
-        cohort.applySnapshot(snapshot.getState());
+        log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
+                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
 
-        //clears the followers log, sets the snapshot index to ensure adjusted-index works
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
-        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.getSnapshotManager().apply(snapshot);
     }
 }
index 9bbe285c29e863c885c88933c7f82fd7d739a704..f1881f5b0f228a62ee4e6e45a03d07c4b0a5e368 100644 (file)
@@ -38,6 +38,9 @@ public class SnapshotManager implements SnapshotState {
 
     private Procedure<Void> createSnapshotProcedure;
 
+    private Snapshot applySnapshot;
+    private Procedure<byte[]> applySnapshotProcedure;
+
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
@@ -58,14 +61,19 @@ public class SnapshotManager implements SnapshotState {
         return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
+    @Override
+    public void apply(Snapshot snapshot) {
+        currentState.apply(snapshot);
+    }
+
     @Override
     public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
         currentState.persist(snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber) {
-        currentState.commit(sequenceNumber);
+    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        currentState.commit(sequenceNumber, currentBehavior);
     }
 
     @Override
@@ -82,6 +90,10 @@ public class SnapshotManager implements SnapshotState {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
+    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+        this.applySnapshotProcedure = applySnapshotProcedure;
+    }
+
     public long getLastSequenceNumber() {
         return lastSequenceNumber;
     }
@@ -118,13 +130,18 @@ public class SnapshotManager implements SnapshotState {
             return false;
         }
 
+        @Override
+        public void apply(Snapshot snapshot) {
+            LOG.debug("apply should not be called in state {}", this);
+        }
+
         @Override
         public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber) {
+        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -228,6 +245,19 @@ public class SnapshotManager implements SnapshotState {
             return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
         }
 
+        @Override
+        public void apply(Snapshot snapshot) {
+            applySnapshot = snapshot;
+
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
+
+            context.getPersistenceProvider().saveSnapshot(snapshot);
+
+            SnapshotManager.this.currentState = PERSISTING;
+        }
+
         @Override
         public String toString() {
             return "Idle";
@@ -322,28 +352,49 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber) {
+        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
             LOG.debug("Snapshot success sequence number:", sequenceNumber);
-            context.getReplicatedLog().snapshotCommit();
+
+            if(applySnapshot != null) {
+                try {
+                    applySnapshotProcedure.apply(applySnapshot.getState());
+
+                    //clears the followers log, sets the snapshot index to ensure adjusted-index works
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
+                    context.setLastApplied(applySnapshot.getLastAppliedIndex());
+                    context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+                } catch (Exception e) {
+                    LOG.error("Error applying snapshot", e);
+                }
+            } else {
+                context.getReplicatedLog().snapshotCommit();
+            }
+
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
             lastSequenceNumber = -1;
+            applySnapshot = null;
             SnapshotManager.this.currentState = IDLE;
         }
 
         @Override
         public void rollback() {
-            context.getReplicatedLog().snapshotRollback();
-
-            LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
-                            "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                    context.getReplicatedLog().getSnapshotIndex(),
-                    context.getReplicatedLog().getSnapshotTerm(),
-                    context.getReplicatedLog().size());
+            // Nothing to rollback if we're applying a snapshot from the leader.
+            if(applySnapshot == null) {
+                context.getReplicatedLog().snapshotRollback();
+
+                LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+                        "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        context.getReplicatedLog().size());
+            }
 
+            lastSequenceNumber = -1;
+            applySnapshot = null;
             SnapshotManager.this.currentState = IDLE;
         }
 
index 9949211c63c416d69fb6a97ba6d6ae6bc823c086..3167596cc38ce56c55e67200531ce9afea93b49b 100644 (file)
@@ -38,6 +38,13 @@ public interface SnapshotState {
      */
     boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
+    /**
+     * Applies a snapshot on a follower that was installed by the leader.
+     *
+     * @param snapshot the Snapshot to apply.
+     */
+    void apply(Snapshot snapshot);
+
     /**
      * Persist the snapshot
      *
@@ -52,7 +59,7 @@ public interface SnapshotState {
      *
      * @param sequenceNumber
      */
-    void commit(long sequenceNumber);
+    void commit(long sequenceNumber, RaftActorBehavior currentBehavior);
 
     /**
      * Rollback the snapshot
index a47cf118963e1698c68ef2c2706a44c2baaeeb59..74440b5c24c37b7c22ba6ca8fb6ee1a3eed16c38 100644 (file)
@@ -50,6 +50,24 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
 
+    public static class SetPeerAddress {
+        private final String peerId;
+        private final String peerAddress;
+
+        public SetPeerAddress(String peerId, String peerAddress) {
+            this.peerId = peerId;
+            this.peerAddress = peerAddress;
+        }
+
+        public String getPeerId() {
+            return peerId;
+        }
+
+        public String getPeerAddress() {
+            return peerAddress;
+        }
+    }
+
     public static class TestRaftActor extends MockRaftActor {
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
@@ -97,6 +115,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
                 return;
             }
 
+            if(message instanceof SetPeerAddress) {
+                setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
+                        ((SetPeerAddress) message).getPeerAddress());
+                return;
+            }
+
             try {
                 if(!dropMessages.containsKey(message.getClass())) {
                     super.handleCommand(message);
index d72d40416f3b3288a1b3f2b1aaa97f48c473af82..c1aa75a12de5028331a818b7c4fb6a9c5a426b38 100644 (file)
@@ -202,6 +202,10 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     @Override
     public void applyRecoverySnapshot(byte[] bytes) {
         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+        applySnapshotBytes(bytes);
+    }
+
+    private void applySnapshotBytes(byte[] bytes) {
         try {
             Object data = toObject(bytes);
             if (data instanceof List) {
@@ -222,6 +226,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     public void applySnapshot(byte [] snapshot) {
         LOG.info("{}: applySnapshot called", persistenceId());
         snapshotCohortDelegate.applySnapshot(snapshot);
+        applySnapshotBytes(snapshot);
     }
 
     @Override
index 7509aa09a234eef3dc6196d43fb8fc7c96e8836d..7d41508c3f298fed6df2e0b141505cd391522d3f 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
@@ -16,15 +17,12 @@ import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
-import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -92,29 +90,16 @@ public class RaftActorSnapshotMessageSupportTest {
     @Test
     public void testOnApplySnapshot() {
 
-        ReplicatedLog replicatedLog = context.getReplicatedLog();
-        replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
-
-        byte[] snapshotBytes = {1,2,3,4,5};
-
-        ReplicatedLogEntry unAppliedEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
-
         long lastAppliedDuringSnapshotCapture = 1;
         long lastIndexDuringSnapshotCapture = 2;
+        byte[] snapshotBytes = {1,2,3,4,5};
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry),
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
 
         sendMessageToSupport(new ApplySnapshot(snapshot));
 
-        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
-        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
-        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
-        assertEquals("Commit index", -1, context.getCommitIndex());
-        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
-        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
-
-        verify(mockCohort).applySnapshot(snapshotBytes);
+        verify(mockSnapshotManager).apply(snapshot);
     }
 
     @Test
@@ -132,7 +117,7 @@ public class RaftActorSnapshotMessageSupportTest {
         long sequenceNumber = 100;
         sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
 
-        verify(mockSnapshotManager).commit(sequenceNumber);
+        verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior));
     }
 
     @Test
@@ -149,7 +134,7 @@ public class RaftActorSnapshotMessageSupportTest {
 
         sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSnapshotManager).commit(-1);
+        verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior));
     }
 
     @Test
index fb9541c8d9dd598961522ac08a476a7ffb744bb6..eb70c8b028e16258c392e79a3394d040f9c7c291 100644 (file)
@@ -585,7 +585,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -689,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
index 315583fa381506d191db535c06fc23924d1f500b..9fe03f9673c9f55cf8759dc51cba69b2c5bf437d 100644 (file)
@@ -8,15 +8,22 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
 /**
@@ -34,8 +41,9 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
                 newFollowerConfigParams());
 
-        peerAddresses = ImmutableMap.<String, String>builder().
-                put(follower1Id, follower1Actor.path().toString()).build();
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(follower1Id, follower1Actor.path().toString());
+        peerAddresses.put(follower2Id, "");
 
         leaderConfigParams = newLeaderConfigParams();
         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
@@ -131,6 +139,76 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
                 leaderActor.underlyingActor().getState());
     }
 
+    @Test
+    public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
+
+        send2InitialPayloads();
+
+        leader = leaderActor.underlyingActor().getCurrentBehavior();
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+
+        leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // Verify the leader applies the 3rd payload state.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+
+        MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
+
+        assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
+
+        killActor(follower2Actor);
+
+        InMemoryJournal.clear();
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+        TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
+        follower2CollectorActor = follower2Underlying.collectorActor();
+        follower2Context = follower2Underlying.getRaftActorContext();
+
+        leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
+
+        // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
+
+        // Wait for the follower to persist the snapshot.
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
+        // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
+        // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
+        // snapshotIndex and not the actual last index included in the snapshot.
+        // FIXME? - is this OK?
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
+        List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+
+        assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+        assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+
+        killActor(follower2Actor);
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        follower2Underlying = follower2Actor.underlyingActor();
+        follower2Underlying.waitForRecoveryComplete();
+        follower2Context = follower2Underlying.getRaftActorContext();
+
+        assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+        assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+    }
+
     private void reinstateLeaderActor() {
         killActor(leaderActor);
 
index 89aadbe0ae0fb4dc0ba7646e24942a7323be0376..d94eb6b041a52631c5ca02684e0c24bc32b170e6 100644 (file)
@@ -340,7 +340,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.commit(100L);
+        snapshotManager.commit(100L, mockRaftActorBehavior);
 
         verify(mockReplicatedLog).snapshotCommit();
 
@@ -361,7 +361,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.commit(100L);
+        snapshotManager.commit(100L, mockRaftActorBehavior);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -373,7 +373,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommitBeforeCapture(){
-        snapshotManager.commit(100L);
+        snapshotManager.commit(100L, mockRaftActorBehavior);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -393,9 +393,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.commit(100L);
+        snapshotManager.commit(100L, mockRaftActorBehavior);
 
-        snapshotManager.commit(100L);
+        snapshotManager.commit(100L, mockRaftActorBehavior);
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
index db04956f0005ba1f7a8e2b73b5ece7615e37aba3..0b4abe98a10a9ecac5be58a036f0653bcece5e8c 100644 (file)
@@ -590,7 +590,8 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+    @VisibleForTesting
+    public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
         return snapshotCohort;
     }
 
index 70869ee68a55644cf7aa8c5103031a72a881d703..576678a802c7ec054f773668f200b26f197ed19e 100644 (file)
@@ -80,7 +80,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -113,6 +112,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
@@ -484,16 +484,20 @@ public class ShardTest extends AbstractShardTest {
         DataTree store = InMemoryDataTreeFactory.getInstance().create();
         store.setSchemaContext(SCHEMA_CONTEXT);
 
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
+                        ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
+
+        writeToStore(store, TestModel.TEST_PATH, container);
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
         NormalizedNode<?,?> expected = readStore(store, root);
 
-        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                SerializationUtils.serializeNormalizedNode(expected),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+        Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
 
-        shard.underlyingActor().onReceiveCommand(applySnapshot);
+        shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
 
         NormalizedNode<?,?> actual = readStore(shard, root);
 
index f1f96d4a7d7acc431e61d8e5688156b5286bd83d..07b7f0e33462dc70df6d025fb8ae36b73795b199 100644 (file)
@@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
@@ -120,11 +119,10 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
-        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                encode.getNormalizedNode().toByteString().toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+        Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
 
-        shard.underlyingActor().onReceiveCommand(applySnapshot);
+        shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
 
         NormalizedNode<?,?> actual = readStore(shard, root);