package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
MessageCollectorActor.clearMessages(leaderCollectorActor);
- testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
// Send another payload to trigger a second leader snapshot.
MockPayload payload7 = sendPayloadData(leaderActor, "seven");
expSnapshotState.add(payload5);
expSnapshotState.add(payload6);
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+ MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+ // Send a server config change to test that the install snapshot includes the server config.
+
+ ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(leaderId, true),
+ new ServerInfo(follower1Id, false),
+ new ServerInfo(follower2Id, false)));
+ leaderContext.updatePeerIds(serverConfig);
+ ((AbstractLeader)leader).updateMinReplicaCount();
+ leaderActor.tell(serverConfig, ActorRef.noSender());
+
+ applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+ applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+ verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
expSnapshotState.add(payload7);
- verifyInstallSnapshotToLaggingFollower(7);
+ verifyInstallSnapshotToLaggingFollower(8, serverConfig);
testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
}
expSnapshotState.add(payload1);
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
// Send another payload with a large enough relative size in combination with the last payload
// that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
expSnapshotState.add(payload2);
- verifyInstallSnapshotToLaggingFollower(2L);
+ verifyInstallSnapshotToLaggingFollower(2L, null);
// Sends a payload with index 3.
verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
*
* @throws Exception
*/
- private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
+ private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
+ @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
List<Snapshot> persistedSnapshots;
List<ReplicatedLogEntry> unAppliedEntry;
ApplySnapshot applySnapshot;
InstallSnapshot installSnapshot;
- InstallSnapshotReply installSnapshotReply;
testLog.info("testInstallSnapshotToLaggingFollower starting");
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+ // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+ // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+ // far enough to cause the previous snapshot to be deleted. This is because
+ // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+ // This is OK - the next snapshot should delete it. In production, even if the system restarted
+ // before another snapshot, they would both get applied which wouldn't hurt anything.
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+ Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+ unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+ int snapshotSize = persistedSnapshot.getState().length;
+ int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
- assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+ assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
- installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
- assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
- assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
- assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
- assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+ leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+ int index = 1;
+ for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+ assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+ assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+ assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+ assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ }
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
- // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
- // the snapshot store because the second snapshot was initiated by the follower install snapshot and
- // not because the batch count was reached so the persisted journal sequence number wasn't advanced
- // far enough to cause the previous snapshot to be deleted. This is because
- // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
- // This is OK - the next snapshot should delete it. In production, even if the system restarted
- // before another snapshot, they would both get applied which wouldn't hurt anything.
- persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
- Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+ if(expServerConfig != null) {
+ Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+ assertEquals("Leader snapshot server config", expServerInfo,
+ new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+
+ assertEquals("Follower 2 snapshot server config", expServerInfo,
+ new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+
+ ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
+ assertNotNull("Follower 2 server config is null", follower2ServerConfig);
+
+ assertEquals("Follower 2 server config", expServerInfo,
+ new HashSet<>(follower2ServerConfig.getServerConfig()));
+ }
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);