import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
+
+ List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
+ assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
+ ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
+ assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
+ assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
+ assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
+
+ persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
+ assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
+ logEntry = persistedLogEntries.get(0);
+ assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
+ assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
+ assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
+ logEntry.getData().getClass());
}
@Test
Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
+ // Send a second AddServer - should get queued
JavaTestKit testKit2 = new JavaTestKit(getSystem());
leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
+ // Continue the first AddServer
newFollowerRaftActorInstance.setDropMessageOfType(null);
newFollowerRaftActor.tell(installSnapshot, leaderActor);
+ // Verify both complete successfully
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
}
@Test
- public void testAddServerWithInstallSnapshotTimeout() throws Exception {
- newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+ public void testAddServerWithPriorSnapshotInProgress() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop commit message for now to delay snapshot completion
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+ String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ leaderRaftActor.setDropMessageOfType(null);
+ leaderActor.tell(commitMsg, leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+ expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+
+ // Verify ServerConfigurationPayload entry in leader's log
+
+ assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+ assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+ verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+ votingServer(NEW_SERVER_ID));
+ }
+
+ @Test
+ public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop commit message so the snapshot doesn't complete.
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ }
+
+ @Test
+ public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop the commit message so the snapshot doesn't complete yet.
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+ // Change the leader behavior to follower
+ leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+ // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
+ // snapshot completes. This will prevent the invalid snapshot from completing and fail the
+ // isCapturing assertion below.
+ leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
+
+ // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
+ leaderActor.tell(commitMsg, leaderActor);
+
+ leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
+ }
+
+ @Test
+ public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop the UnInitializedFollowerSnapshotReply to delay it.
+ leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
+ UnInitializedFollowerSnapshotReply.class);
+
+ // Prevent election timeout when the leader switches to follower
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+ // Change the leader behavior to follower
+ leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+ // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
+ leaderRaftActor.setDropMessageOfType(null);
+ leaderActor.tell(snapshotReply, leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ }
+ @Test
+ public void testAddServerWithInstallSnapshotTimeout() throws Exception {
RaftActorContext initialActorContext = new MockRaftActorContext();
initialActorContext.setCommitIndex(-1);
initialActorContext.setLastApplied(-1);
RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ // Drop the InstallSnapshot message so it times out
+ newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
+
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
return followerActorContext;
}
- public static class MockLeaderRaftActor extends MockRaftActor {
+ static abstract class AbstractMockRaftActor extends MockRaftActor {
+ private volatile TestActorRef<MessageCollectorActor> collectorActor;
+ private volatile Class<?> dropMessageOfType;
+
+ AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+ super(id, peerAddresses, config, dataPersistenceProvider);
+ this.collectorActor = collectorActor;
+ }
+
+ void setDropMessageOfType(Class<?> dropMessageOfType) {
+ this.dropMessageOfType = dropMessageOfType;
+ }
+
+ void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ }
+
+ @Override
+ public void handleCommand(Object message) {
+ if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+ super.handleCommand(message);
+ }
+
+ if(collectorActor != null) {
+ collectorActor.tell(message, getSender());
+ }
+ }
+ }
+
+ public static class MockLeaderRaftActor extends AbstractMockRaftActor {
public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
RaftActorContext fromContext) {
- super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
+ super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
+ setPersistence(false);
RaftActorContext context = getRaftActorContext();
for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
}
}
- public static class MockNewFollowerRaftActor extends MockRaftActor {
- private final TestActorRef<MessageCollectorActor> collectorActor;
- private volatile Class<?> dropMessageOfType;
-
+ public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
- super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
- this.collectorActor = collectorActor;
- }
-
- void setDropMessageOfType(Class<?> dropMessageOfType) {
- this.dropMessageOfType = dropMessageOfType;
- }
-
- @Override
- public void handleCommand(Object message) {
- if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
- super.handleCommand(message);
- }
-
- collectorActor.tell(message, getSender());
+ super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
+ setPersistence(false);
}
static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {