From 55e018bfad0c70b773641142d6fbf009cd67fda4 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 9 Jun 2015 09:32:52 -0400 Subject: [PATCH] Bug 3570: Persist snapshot on follower ApplySnapshot When a leader installs a snapshot on a follower, the follower now perists the snapshot. Change-Id: I56e25aa80f335e41a992ddce084c84c2a345b03b Signed-off-by: Tom Pantelis (cherry picked from commit 86f58c88f0e6a0d3caf930f1ac59ab617e034894) --- .../raft/RaftActorSnapshotMessageSupport.java | 27 +++--- .../cluster/raft/SnapshotManager.java | 75 ++++++++++++++--- .../cluster/raft/SnapshotState.java | 9 +- .../AbstractRaftActorIntegrationTest.java | 24 ++++++ .../cluster/raft/MockRaftActor.java | 5 ++ .../RaftActorSnapshotMessageSupportTest.java | 27 ++---- .../cluster/raft/RaftActorTest.java | 4 +- .../cluster/raft/RecoveryIntegrationTest.java | 82 ++++++++++++++++++- .../cluster/raft/SnapshotManagerTest.java | 10 +-- .../controller/cluster/datastore/Shard.java | 3 +- .../cluster/datastore/ShardTest.java | 16 ++-- .../datastore/compat/PreLithiumShardTest.java | 8 +- 12 files changed, 222 insertions(+), 68 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index cab76e90ce..2db595d743 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -30,11 +30,18 @@ class RaftActorSnapshotMessageSupport { private final Procedure createSnapshotProcedure = new Procedure() { @Override - public void apply(Void notUsed) throws Exception { + public void apply(Void notUsed) { cohort.createSnapshot(context.getActor()); } }; + private final Procedure applySnapshotProcedure = new Procedure() { + @Override + public void apply(byte[] state) { + cohort.applySnapshot(state); + } + }; + RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) { this.context = context; @@ -43,6 +50,7 @@ class RaftActorSnapshotMessageSupport { this.log = context.getLogger(); context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure); + context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure); } boolean handleSnapshotMessage(Object message) { @@ -59,7 +67,7 @@ class RaftActorSnapshotMessageSupport { onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); return true; } else if (message.equals(COMMIT_SNAPSHOT)) { - context.getSnapshotManager().commit(-1); + context.getSnapshotManager().commit(-1, currentBehavior); return true; } else { return false; @@ -84,20 +92,13 @@ class RaftActorSnapshotMessageSupport { long sequenceNumber = success.metadata().sequenceNr(); - context.getSnapshotManager().commit(sequenceNumber); + context.getSnapshotManager().commit(sequenceNumber, currentBehavior); } private void onApplySnapshot(Snapshot snapshot) { - if(log.isDebugEnabled()) { - log.debug("{}: ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm()); - } - - cohort.applySnapshot(snapshot.getState()); + log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(), + snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm()); - //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); + context.getSnapshotManager().apply(snapshot); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 9bbe285c29..f1881f5b0f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -38,6 +38,9 @@ public class SnapshotManager implements SnapshotState { private Procedure createSnapshotProcedure; + private Snapshot applySnapshot; + private Procedure applySnapshotProcedure; + public SnapshotManager(RaftActorContext context, Logger logger) { this.context = context; this.LOG = logger; @@ -58,14 +61,19 @@ public class SnapshotManager implements SnapshotState { return currentState.capture(lastLogEntry, replicatedToAllIndex); } + @Override + public void apply(Snapshot snapshot) { + currentState.apply(snapshot); + } + @Override public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { currentState.persist(snapshotBytes, currentBehavior, totalMemory); } @Override - public void commit(long sequenceNumber) { - currentState.commit(sequenceNumber); + public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { + currentState.commit(sequenceNumber, currentBehavior); } @Override @@ -82,6 +90,10 @@ public class SnapshotManager implements SnapshotState { this.createSnapshotProcedure = createSnapshotProcedure; } + public void setApplySnapshotProcedure(Procedure applySnapshotProcedure) { + this.applySnapshotProcedure = applySnapshotProcedure; + } + public long getLastSequenceNumber() { return lastSequenceNumber; } @@ -118,13 +130,18 @@ public class SnapshotManager implements SnapshotState { return false; } + @Override + public void apply(Snapshot snapshot) { + LOG.debug("apply should not be called in state {}", this); + } + @Override public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { LOG.debug("persist should not be called in state {}", this); } @Override - public void commit(long sequenceNumber) { + public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { LOG.debug("commit should not be called in state {}", this); } @@ -228,6 +245,19 @@ public class SnapshotManager implements SnapshotState { return capture(lastLogEntry, replicatedToAllIndex, targetFollower); } + @Override + public void apply(Snapshot snapshot) { + applySnapshot = snapshot; + + lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); + + LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber); + + context.getPersistenceProvider().saveSnapshot(snapshot); + + SnapshotManager.this.currentState = PERSISTING; + } + @Override public String toString() { return "Idle"; @@ -322,28 +352,49 @@ public class SnapshotManager implements SnapshotState { private class Persisting extends AbstractSnapshotState { @Override - public void commit(long sequenceNumber) { + public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { LOG.debug("Snapshot success sequence number:", sequenceNumber); - context.getReplicatedLog().snapshotCommit(); + + if(applySnapshot != null) { + try { + applySnapshotProcedure.apply(applySnapshot.getState()); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior)); + context.setLastApplied(applySnapshot.getLastAppliedIndex()); + context.setCommitIndex(applySnapshot.getLastAppliedIndex()); + } catch (Exception e) { + LOG.error("Error applying snapshot", e); + } + } else { + context.getReplicatedLog().snapshotCommit(); + } + context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); context.getPersistenceProvider().deleteMessages(lastSequenceNumber); lastSequenceNumber = -1; + applySnapshot = null; SnapshotManager.this.currentState = IDLE; } @Override public void rollback() { - context.getReplicatedLog().snapshotRollback(); - - LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().size()); + // Nothing to rollback if we're applying a snapshot from the leader. + if(applySnapshot == null) { + context.getReplicatedLog().snapshotRollback(); + + LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); + } + lastSequenceNumber = -1; + applySnapshot = null; SnapshotManager.this.currentState = IDLE; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 9949211c63..3167596cc3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -38,6 +38,13 @@ public interface SnapshotState { */ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); + /** + * Applies a snapshot on a follower that was installed by the leader. + * + * @param snapshot the Snapshot to apply. + */ + void apply(Snapshot snapshot); + /** * Persist the snapshot * @@ -52,7 +59,7 @@ public interface SnapshotState { * * @param sequenceNumber */ - void commit(long sequenceNumber); + void commit(long sequenceNumber, RaftActorBehavior currentBehavior); /** * Rollback the snapshot diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index a47cf11896..74440b5c24 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -50,6 +50,24 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest { + public static class SetPeerAddress { + private final String peerId; + private final String peerAddress; + + public SetPeerAddress(String peerId, String peerAddress) { + this.peerId = peerId; + this.peerAddress = peerAddress; + } + + public String getPeerId() { + return peerId; + } + + public String getPeerAddress() { + return peerAddress; + } + } + public static class TestRaftActor extends MockRaftActor { private final TestActorRef collectorActor; @@ -97,6 +115,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest return; } + if(message instanceof SetPeerAddress) { + setPeerAddress(((SetPeerAddress) message).getPeerId().toString(), + ((SetPeerAddress) message).getPeerAddress()); + return; + } + try { if(!dropMessages.containsKey(message.getClass())) { super.handleCommand(message); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index d72d40416f..c1aa75a12d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -202,6 +202,10 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, @Override public void applyRecoverySnapshot(byte[] bytes) { recoveryCohortDelegate.applyRecoverySnapshot(bytes); + applySnapshotBytes(bytes); + } + + private void applySnapshotBytes(byte[] bytes) { try { Object data = toObject(bytes); if (data instanceof List) { @@ -222,6 +226,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); snapshotCohortDelegate.applySnapshot(snapshot); + applySnapshotBytes(snapshot); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 7509aa09a2..7d41508c3f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; @@ -16,15 +17,12 @@ import akka.actor.ActorRef; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; -import java.util.Arrays; import java.util.Collections; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; -import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -92,29 +90,16 @@ public class RaftActorSnapshotMessageSupportTest { @Test public void testOnApplySnapshot() { - ReplicatedLog replicatedLog = context.getReplicatedLog(); - replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1"))); - - byte[] snapshotBytes = {1,2,3,4,5}; - - ReplicatedLogEntry unAppliedEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2")); - long lastAppliedDuringSnapshotCapture = 1; long lastIndexDuringSnapshotCapture = 2; + byte[] snapshotBytes = {1,2,3,4,5}; - Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry), + Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.emptyList(), lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); sendMessageToSupport(new ApplySnapshot(snapshot)); - assertEquals("Journal log size", 1, context.getReplicatedLog().size()); - assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex()); - assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied()); - assertEquals("Commit index", -1, context.getCommitIndex()); - assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex()); - - verify(mockCohort).applySnapshot(snapshotBytes); + verify(mockSnapshotManager).apply(snapshot); } @Test @@ -132,7 +117,7 @@ public class RaftActorSnapshotMessageSupportTest { long sequenceNumber = 100; sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L))); - verify(mockSnapshotManager).commit(sequenceNumber); + verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior)); } @Test @@ -149,7 +134,7 @@ public class RaftActorSnapshotMessageSupportTest { sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT); - verify(mockSnapshotManager).commit(-1); + verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior)); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index fb9541c8d9..eb70c8b028 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -585,7 +585,7 @@ public class RaftActorTest extends AbstractActorTest { assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(-1); + leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -689,7 +689,7 @@ public class RaftActorTest extends AbstractActorTest { assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(-1); + followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java index 315583fa38..9fe03f9673 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -8,15 +8,22 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; +import akka.actor.ActorRef; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; /** @@ -34,8 +41,9 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams()); - peerAddresses = ImmutableMap.builder(). - put(follower1Id, follower1Actor.path().toString()).build(); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1Id, follower1Actor.path().toString()); + peerAddresses.put(follower2Id, ""); leaderConfigParams = newLeaderConfigParams(); leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); @@ -131,6 +139,76 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { leaderActor.underlyingActor().getState()); } + @Test + public void testFollowerRecoveryAfterInstallSnapshot() throws Exception { + + send2InitialPayloads(); + + leader = leaderActor.underlyingActor().getCurrentBehavior(); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); + + leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender()); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // Verify the leader applies the 3rd payload state. + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1); + + MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1); + + assertEquals("Leader commit index", 2, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 2, leaderContext.getLastApplied()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex()); + + killActor(follower2Actor); + + InMemoryJournal.clear(); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + TestRaftActor follower2Underlying = follower2Actor.underlyingActor(); + follower2CollectorActor = follower2Underlying.collectorActor(); + follower2Context = follower2Underlying.getRaftActorContext(); + + leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender()); + + // The leader should install a snapshot so wait for the follower to receive ApplySnapshot. + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); + + // Wait for the follower to persist the snapshot. + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class); + + // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent + // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's + // snapshotIndex and not the actual last index included in the snapshot. + // FIXME? - is this OK? + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class); + List expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2); + + assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); + assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); + assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); + + killActor(follower2Actor); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + + follower2Underlying = follower2Actor.underlyingActor(); + follower2Underlying.waitForRecoveryComplete(); + follower2Context = follower2Underlying.getRaftActorContext(); + + assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); + assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); + assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); + } + private void reinstateLeaderActor() { killActor(leaderActor); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 89aadbe0ae..d94eb6b041 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -340,7 +340,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); - snapshotManager.commit(100L); + snapshotManager.commit(100L, mockRaftActorBehavior); verify(mockReplicatedLog).snapshotCommit(); @@ -361,7 +361,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.commit(100L); + snapshotManager.commit(100L, mockRaftActorBehavior); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -373,7 +373,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCommitBeforeCapture(){ - snapshotManager.commit(100L); + snapshotManager.commit(100L, mockRaftActorBehavior); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -393,9 +393,9 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); - snapshotManager.commit(100L); + snapshotManager.commit(100L, mockRaftActorBehavior); - snapshotManager.commit(100L); + snapshotManager.commit(100L, mockRaftActorBehavior); verify(mockReplicatedLog, times(1)).snapshotCommit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index db04956f00..0b4abe98a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -590,7 +590,8 @@ public class Shard extends RaftActor { } @Override - protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + @VisibleForTesting + public RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return snapshotCohort; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 70869ee68a..576678a802 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -80,7 +80,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -113,6 +112,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; @@ -484,16 +484,20 @@ public class ShardTest extends AbstractShardTest { DataTree store = InMemoryDataTreeFactory.getInstance().create(); store.setSchemaContext(SCHEMA_CONTEXT); - writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild( + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build(); + + writeToStore(store, TestModel.TEST_PATH, container); YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); NormalizedNode expected = readStore(store, root); - ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( - SerializationUtils.serializeNormalizedNode(expected), - Collections.emptyList(), 1, 2, 3, 4)); + Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected), + Collections.emptyList(), 1, 2, 3, 4); - shard.underlyingActor().onReceiveCommand(applySnapshot); + shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); NormalizedNode actual = readStore(shard, root); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index f1f96d4a7d..07b7f0e334 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; @@ -120,11 +119,10 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeMessages.Container encode = codec.encode(expected); - ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( - encode.getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 1, 2, 3, 4)); + Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 1, 2, 3, 4); - shard.underlyingActor().onReceiveCommand(applySnapshot); + shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); NormalizedNode actual = readStore(shard, root); -- 2.36.6