package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@Before
public void setup() {
- follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId)),
newFollowerConfigParams());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, follower1Actor.path().toString());
- peerAddresses.put(follower2Id, "");
-
leaderConfigParams = newLeaderConfigParams();
- leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+ leaderActor = newTestRaftActor(leaderId, Map.of(follower1Id, follower1Actor.path().toString(), follower2Id, ""),
+ leaderConfigParams);
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
// Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
// Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
leaderActor.underlyingActor().getState());
}
// Block these messages initially so we can control the sequence.
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
// Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ assertEquals("Leader state", List.of(payload0, payload1, payload2, payload3, payload4),
leaderActor.underlyingActor().getState());
}
@Test
- public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
+ public void testFollowerRecoveryAfterInstallSnapshot() {
send2InitialPayloads();
leader = leaderActor.underlyingActor().getCurrentBehavior();
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
- newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id,
+ Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
// Verify the leader applies the 3rd payload state.
MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
InMemoryJournal.clear();
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
- newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id,
+ Map.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams());
TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
follower2CollectorActor = follower2Underlying.collectorActor();
follower2Context = follower2Underlying.getRaftActorContext();
// Wait for the follower to persist the snapshot.
MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
- // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
- // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
- // snapshotIndex and not the actual last index included in the snapshot.
- // FIXME? - is this OK?
- MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
- List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+ final List<MockPayload> expFollowerState = List.of(payload0, payload1, payload2);
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
- assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
killActor(follower2Actor);
- follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId)),
newFollowerConfigParams());
follower2Underlying = follower2Actor.underlyingActor();
assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
- assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
}
+ @Test
+ public void testRecoveryDeleteEntries() {
+ send2InitialPayloads();
+
+ sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ sendPayloadData(leaderActor, "three");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+ // Disconnect follower from leader
+ killActor(follower1Actor);
+
+ // Send another payloads
+ sendPayloadData(leaderActor, "four");
+ sendPayloadData(leaderActor, "five");
+
+ verifyRaftState(leaderActor, raftState -> {
+ assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+ });
+
+ // Remove entries started from 4 index
+ leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
+
+ verifyRaftState(leaderActor, raftState -> {
+ assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
+ });
+
+ // Send new payloads
+ final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
+ final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
+
+ verifyRaftState(leaderActor, raftState -> {
+ assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+ });
+
+ reinstateLeaderActor();
+
+ final var log = leaderActor.underlyingActor().getReplicatedLog();
+ assertEquals("Leader last index", 5, log.lastIndex());
+ assertEquals(List.of(payload4, payload5), List.of(log.get(4).getData(), log.get(5).getData()));
+ }
+
private void reinstateLeaderActor() {
killActor(leaderActor);