import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.Nullable;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
- follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId),
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
- Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
- .put(follower1Id, follower1Actor.path().toString())
- .put(follower2Id, follower2Actor.path().toString()).build();
+ Map<String, String> leaderPeerAddresses = Map.of(
+ follower1Id, follower1Actor.path().toString(),
+ follower2Id, follower2Actor.path().toString());
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
currentTerm = leaderContext.getTermInformation().getCurrentTerm();
follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
currentTerm = leaderContext.getTermInformation().getCurrentTerm();
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
// to catch it up because no snapshotting was done so the follower's next index was present in the log.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
// to catch it up because no snapshotting was done so the follower's next index was present in the log.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
// Verify the leader did not try to install a snapshot to catch up follower 2.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
// Verify the leader did not try to install a snapshot to catch up follower 2.
InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
// Ensure there's at least 1 more heartbeat.
MessageCollectorActor.clearMessages(leaderCollectorActor);
// Ensure there's at least 1 more heartbeat.
MessageCollectorActor.clearMessages(leaderCollectorActor);
new ServerInfo(leaderId, true),
new ServerInfo(follower1Id, false),
new ServerInfo(follower2Id, false)));
new ServerInfo(leaderId, true),
new ServerInfo(follower1Id, false),
new ServerInfo(follower2Id, false)));
// Verify a snapshot is not triggered.
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
CaptureSnapshot.class);
// Verify a snapshot is not triggered.
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
CaptureSnapshot.class);
verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
// Verify the follower 1 applies the state.
applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
// Verify the follower 1 applies the state.
applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
// This is OK - the next snapshot should delete it. In production, even if the system restarted
// before another snapshot, they would both get applied which wouldn't hurt anything.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
// This is OK - the next snapshot should delete it. In production, even if the system restarted
// before another snapshot, they would both get applied which wouldn't hurt anything.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
- final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
- + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
+ final int expTotalChunks = snapshotSize / MAXIMUM_MESSAGE_SLICE_SIZE
+ + (snapshotSize % MAXIMUM_MESSAGE_SLICE_SIZE > 0 ? 1 : 0);
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
assertNotNull("Follower 2 server config is null", follower2ServerConfig);
assertEquals("Follower 2 server config", expServerInfo,
ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
assertNotNull("Follower 2 server config is null", follower2ServerConfig);
assertEquals("Follower 2 server config", expServerInfo,
// Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
// added after the snapshot as the persisted journal should've been purged to the snapshot
// sequence number.
// Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
// added after the snapshot as the persisted journal should've been purged to the snapshot
// sequence number.
- verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
- new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
+ verifyPersistedJournal(leaderId, List.of(
+ new SimpleReplicatedLogEntry(5, currentTerm, payload5),
+ new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
// Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
List<ApplyJournalEntries> persistedApplyJournalEntries =
// Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
List<ApplyJournalEntries> persistedApplyJournalEntries =
// Verify follower 1 applies the 3 log entries.
applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
// Verify follower 1 applies the 3 log entries.
applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
int numEntries = data.length;
// Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
int numEntries = data.length;
// Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
- List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
- ApplyState.class, numEntries);
+ final var leaderStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
+ ApplyState.class, numEntries);
- MockPayload payload = expSnapshotState.get(i);
- verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
+ final MockPayload payload = expSnapshotState.get(i);
+ verifyApplyState(leaderStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
- applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, numEntries);
+ final var follower1States = MessageCollectorActor.expectMatching(follower1CollectorActor,
+ ApplyState.class, numEntries);
- MockPayload payload = expSnapshotState.get(i);
- verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
+ final MockPayload payload = expSnapshotState.get(i);
+ verifyApplyState(follower1States.get(i), null, null, currentTerm, i, payload);
- applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, numEntries);
+ final var follower2States = MessageCollectorActor.expectMatching(follower2CollectorActor,
+ ApplyState.class, numEntries);
- MockPayload payload = expSnapshotState.get(i);
- verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
+ final MockPayload payload = expSnapshotState.get(i);
+ verifyApplyState(follower2States.get(i), null, null, currentTerm, i, payload);