Bug 5460: Fix snaphots on follower 74/36274/2
authorTom Pantelis <tpanteli@brocade.com>
Tue, 15 Mar 2016 23:51:58 +0000 (19:51 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 16 Mar 2016 00:09:51 +0000 (20:09 -0400)
Added a callback to the appendAndPersist call in Follower to call
captureSnapshotIfReady.

Added checks in ReplicationAndSnapshotsIntegrationTest to verify the
followers snapshot along with the leader.

Change-Id: Ie71f1b16152541d069f9d005ba669cb1e5771dd1
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java

index afb8f29..33ed335 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -37,12 +38,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-
-    private SnapshotTracker snapshotTracker = null;
+    private static final int SYNC_THRESHOLD = 10;
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private static final int SYNC_THRESHOLD = 10;
+    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+        @Override
+        public void apply(ReplicatedLogEntry logEntry) {
+            context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+        }
+    };
+
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
         this(context, null, (short)-1);
@@ -195,7 +202,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
-                context.getReplicatedLog().appendAndPersist(entry);
+                context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
 
                 if(entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
index 949889a..6334173 100644 (file)
@@ -62,11 +62,13 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
         // persistence recovery.
 
+        DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+        followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
-                follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
+                follower2Id, testActorPath(follower2Id)), followerConfigParams);
 
         follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
-                follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
 
         peerAddresses = ImmutableMap.<String, String>builder().
                 put(follower1Id, follower1Actor.path().toString()).
@@ -226,6 +228,18 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader last applied", 3, leaderContext.getLastApplied());
         assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex());
 
+        // The followers should also snapshot so verify.
+
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class);
+        persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class);
+        assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
+        unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
+        assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
+        verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
+
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.