import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
-import org.junit.Assert;
+import org.eclipse.jdt.annotation.Nullable;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
- follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId),
follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
- Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
- .put(follower1Id, follower1Actor.path().toString())
- .put(follower2Id, follower2Actor.path().toString()).build();
+ Map<String, String> leaderPeerAddresses = Map.of(
+ follower1Id, follower1Actor.path().toString(),
+ follower2Id, follower2Actor.path().toString());
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
testLog.info("Leader created and elected");
}
+ private void setupFollower2() {
+ follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+
+ follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+ follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
+
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+ }
+
/**
* Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
* caught up via AppendEntries.
// to catch it up because no snapshotting was done so the follower's next index was present in the log.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
- Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
+ assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
}
// Verify the leader did not try to install a snapshot to catch up follower 2.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
- Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
+ assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
// Ensure there's at least 1 more heartbeat.
MessageCollectorActor.clearMessages(leaderCollectorActor);
// Send a server config change to test that the install snapshot includes the server config.
- ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+ ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(List.of(
new ServerInfo(leaderId, true),
new ServerInfo(follower1Id, false),
new ServerInfo(follower2Id, false)));
testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
}
+ /**
+ * Tests whether the leader reattempts to send a snapshot when a follower crashes before replying with
+ * InstallSnapshotReply after the last chunk has been sent.
+ */
+ @Test
+ public void testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation() throws Exception {
+ testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation starting");
+
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+ // Configure follower 2 to drop messages and lag.
+ follower2Actor.stop();
+
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
+ // Send 5 payloads - the second should cause a leader snapshot.
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+ final MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+ verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+ testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation: "
+ + "sending 1 more payload to trigger second snapshot");
+
+ // Send another payload to trigger a second leader snapshot.
+ MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+ // Verify follower 1 applies each log entry.
+ applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+ verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+ leaderActor.underlyingActor()
+ .startDropMessages(InstallSnapshotReply.class, reply -> reply.getChunkIndex() == 5);
+
+ setupFollower2();
+
+ MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
+
+ follower2Actor.stop();
+
+ // need to get rid of persistence for follower2
+ InMemorySnapshotStore.clearSnapshotsFor(follower2Id);
+
+ leaderActor.underlyingActor().stopDropMessages(InstallSnapshotReply.class);
+
+ MessageCollectorActor.clearMessages(follower2CollectorActor);
+ setupFollower2();
+
+ MessageCollectorActor.expectMatching(follower2CollectorActor, SaveSnapshotSuccess.class, 1);
+ }
+
/**
* Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
* leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
// Verify a snapshot is not triggered.
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
CaptureSnapshot.class);
- Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+ assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
expSnapshotState.add(payload1);
verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
- Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+ assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
// Verify the follower 1 applies the state.
applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
/**
* Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
*/
- private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
- @Nullable ServerConfigurationPayload expServerConfig) {
+ private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+ final @Nullable ServerConfigurationPayload expServerConfig) {
testLog.info("verifyInstallSnapshotToLaggingFollower starting");
MessageCollectorActor.clearMessages(leaderCollectorActor);
// This is OK - the next snapshot should delete it. In production, even if the system restarted
// before another snapshot, they would both get applied which wouldn't hurt anything.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+ assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
verifyLeadersTrimmedLog(lastAppliedIndex);
if (expServerConfig != null) {
- Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+ Set<ServerInfo> expServerInfo = Set.copyOf(expServerConfig.getServerConfig());
assertEquals("Leader snapshot server config", expServerInfo,
- new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+ Set.copyOf(persistedSnapshot.getServerConfiguration().getServerConfig()));
assertEquals("Follower 2 snapshot server config", expServerInfo,
- new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+ Set.copyOf(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
assertNotNull("Follower 2 server config is null", follower2ServerConfig);
assertEquals("Follower 2 server config", expServerInfo,
- new HashSet<>(follower2ServerConfig.getServerConfig()));
+ Set.copyOf(follower2ServerConfig.getServerConfig()));
}
MessageCollectorActor.clearMessages(leaderCollectorActor);
// Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
// added after the snapshot as the persisted journal should've been purged to the snapshot
// sequence number.
- verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
- new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
+ verifyPersistedJournal(leaderId, List.of(
+ new SimpleReplicatedLogEntry(5, currentTerm, payload5),
+ new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
// Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
List<ApplyJournalEntries> persistedApplyJournalEntries =
}
}
- Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6),
- found);
+ assertTrue("ApplyJournalEntries with index 6 not found in leader's persisted journal", found);
// Verify follower 1 applies the 3 log entries.
applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
/**
* Kill the leader actor, reinstate it and verify the recovered journal.
*/
- private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
- long firstJournalEntryIndex) {
+ private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+ final long firstJournalEntryIndex) {
testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
+ "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
}
- private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
+ private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
// Send the payloads.
for (String d: data) {