private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
@Override
- public void apply(Void notUsed) throws Exception {
+ public void apply(Void notUsed) {
cohort.createSnapshot(context.getActor());
}
};
+ private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
+ @Override
+ public void apply(byte[] state) {
+ cohort.applySnapshot(state);
+ }
+ };
+
RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
RaftActorSnapshotCohort cohort) {
this.context = context;
this.log = context.getLogger();
context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
+ context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
}
boolean handleSnapshotMessage(Object message) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
return true;
} else if (message.equals(COMMIT_SNAPSHOT)) {
- context.getSnapshotManager().commit(-1);
+ context.getSnapshotManager().commit(-1, currentBehavior);
return true;
} else {
return false;
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(sequenceNumber);
+ context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
}
private void onApplySnapshot(Snapshot snapshot) {
- if(log.isDebugEnabled()) {
- log.debug("{}: ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
- }
-
- cohort.applySnapshot(snapshot.getState());
+ log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
+ snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
- //clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.getSnapshotManager().apply(snapshot);
}
}
private Procedure<Void> createSnapshotProcedure;
+ private Snapshot applySnapshot;
+ private Procedure<byte[]> applySnapshotProcedure;
+
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
this.LOG = logger;
return currentState.capture(lastLogEntry, replicatedToAllIndex);
}
+ @Override
+ public void apply(Snapshot snapshot) {
+ currentState.apply(snapshot);
+ }
+
@Override
public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
currentState.persist(snapshotBytes, currentBehavior, totalMemory);
}
@Override
- public void commit(long sequenceNumber) {
- currentState.commit(sequenceNumber);
+ public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+ currentState.commit(sequenceNumber, currentBehavior);
}
@Override
this.createSnapshotProcedure = createSnapshotProcedure;
}
+ public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+ this.applySnapshotProcedure = applySnapshotProcedure;
+ }
+
public long getLastSequenceNumber() {
return lastSequenceNumber;
}
return false;
}
+ @Override
+ public void apply(Snapshot snapshot) {
+ LOG.debug("apply should not be called in state {}", this);
+ }
+
@Override
public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
LOG.debug("persist should not be called in state {}", this);
}
@Override
- public void commit(long sequenceNumber) {
+ public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
LOG.debug("commit should not be called in state {}", this);
}
return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
}
+ @Override
+ public void apply(Snapshot snapshot) {
+ applySnapshot = snapshot;
+
+ lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+ LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
+
+ context.getPersistenceProvider().saveSnapshot(snapshot);
+
+ SnapshotManager.this.currentState = PERSISTING;
+ }
+
@Override
public String toString() {
return "Idle";
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(long sequenceNumber) {
+ public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
LOG.debug("Snapshot success sequence number:", sequenceNumber);
- context.getReplicatedLog().snapshotCommit();
+
+ if(applySnapshot != null) {
+ try {
+ applySnapshotProcedure.apply(applySnapshot.getState());
+
+ //clears the followers log, sets the snapshot index to ensure adjusted-index works
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
+ context.setLastApplied(applySnapshot.getLastAppliedIndex());
+ context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+ } catch (Exception e) {
+ LOG.error("Error applying snapshot", e);
+ }
+ } else {
+ context.getReplicatedLog().snapshotCommit();
+ }
+
context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
lastSequenceNumber = -1;
+ applySnapshot = null;
SnapshotManager.this.currentState = IDLE;
}
@Override
public void rollback() {
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ // Nothing to rollback if we're applying a snapshot from the leader.
+ if(applySnapshot == null) {
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
+ }
+ lastSequenceNumber = -1;
+ applySnapshot = null;
SnapshotManager.this.currentState = IDLE;
}
*/
boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+ /**
+ * Applies a snapshot on a follower that was installed by the leader.
+ *
+ * @param snapshot the Snapshot to apply.
+ */
+ void apply(Snapshot snapshot);
+
/**
* Persist the snapshot
*
*
* @param sequenceNumber
*/
- void commit(long sequenceNumber);
+ void commit(long sequenceNumber, RaftActorBehavior currentBehavior);
/**
* Rollback the snapshot
*/
public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
+ public static class SetPeerAddress {
+ private final String peerId;
+ private final String peerAddress;
+
+ public SetPeerAddress(String peerId, String peerAddress) {
+ this.peerId = peerId;
+ this.peerAddress = peerAddress;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public String getPeerAddress() {
+ return peerAddress;
+ }
+ }
+
public static class TestRaftActor extends MockRaftActor {
private final TestActorRef<MessageCollectorActor> collectorActor;
return;
}
+ if(message instanceof SetPeerAddress) {
+ setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
+ ((SetPeerAddress) message).getPeerAddress());
+ return;
+ }
+
try {
if(!dropMessages.containsKey(message.getClass())) {
super.handleCommand(message);
@Override
public void applyRecoverySnapshot(byte[] bytes) {
recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+ applySnapshotBytes(bytes);
+ }
+
+ private void applySnapshotBytes(byte[] bytes) {
try {
Object data = toObject(bytes);
if (data instanceof List) {
public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
snapshotCohortDelegate.applySnapshot(snapshot);
+ applySnapshotBytes(snapshot);
}
@Override
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
-import java.util.Arrays;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@Test
public void testOnApplySnapshot() {
- ReplicatedLog replicatedLog = context.getReplicatedLog();
- replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
-
- byte[] snapshotBytes = {1,2,3,4,5};
-
- ReplicatedLogEntry unAppliedEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
-
long lastAppliedDuringSnapshotCapture = 1;
long lastIndexDuringSnapshotCapture = 2;
+ byte[] snapshotBytes = {1,2,3,4,5};
- Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry),
+ Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
sendMessageToSupport(new ApplySnapshot(snapshot));
- assertEquals("Journal log size", 1, context.getReplicatedLog().size());
- assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
- assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
- assertEquals("Commit index", -1, context.getCommitIndex());
- assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
- assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
-
- verify(mockCohort).applySnapshot(snapshotBytes);
+ verify(mockSnapshotManager).apply(snapshot);
}
@Test
long sequenceNumber = 100;
sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
- verify(mockSnapshotManager).commit(sequenceNumber);
+ verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior));
}
@Test
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(-1);
+ verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior));
}
@Test
assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
/**
follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
newFollowerConfigParams());
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).build();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, follower1Actor.path().toString());
+ peerAddresses.put(follower2Id, "");
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
leaderActor.underlyingActor().getState());
}
+ @Test
+ public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
+
+ send2InitialPayloads();
+
+ leader = leaderActor.underlyingActor().getCurrentBehavior();
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+
+ leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // Verify the leader applies the 3rd payload state.
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+
+ MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
+
+ assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
+
+ killActor(follower2Actor);
+
+ InMemoryJournal.clear();
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+ TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
+ follower2CollectorActor = follower2Underlying.collectorActor();
+ follower2Context = follower2Underlying.getRaftActorContext();
+
+ leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
+
+ // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
+
+ // Wait for the follower to persist the snapshot.
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
+ // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
+ // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
+ // snapshotIndex and not the actual last index included in the snapshot.
+ // FIXME? - is this OK?
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
+ List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+
+ assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+ assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+ assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+
+ killActor(follower2Actor);
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+
+ follower2Underlying = follower2Actor.underlyingActor();
+ follower2Underlying.waitForRecoveryComplete();
+ follower2Context = follower2Underlying.getRaftActorContext();
+
+ assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+ assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+ assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+ }
+
private void reinstateLeaderActor() {
killActor(leaderActor);
snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, mockRaftActorBehavior);
verify(mockReplicatedLog).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, mockRaftActorBehavior);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, mockRaftActorBehavior);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, mockRaftActorBehavior);
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, mockRaftActorBehavior);
verify(mockReplicatedLog, times(1)).snapshotCommit();
}
@Override
- protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ @VisibleForTesting
+ public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
return snapshotCohort;
}
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
DataTree store = InMemoryDataTreeFactory.getInstance().create();
store.setSchemaContext(SCHEMA_CONTEXT);
- writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
+
+ writeToStore(store, TestModel.TEST_PATH, container);
YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
NormalizedNode<?,?> expected = readStore(store, root);
- ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
- SerializationUtils.serializeNormalizedNode(expected),
- Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+ Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
+ Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
- shard.underlyingActor().onReceiveCommand(applySnapshot);
+ shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
NormalizedNode<?,?> actual = readStore(shard, root);
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
NormalizedNodeMessages.Container encode = codec.encode(expected);
- ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
- encode.getNormalizedNode().toByteString().toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+ Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
- shard.underlyingActor().onReceiveCommand(applySnapshot);
+ shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
NormalizedNode<?,?> actual = readStore(shard, root);