+ @Test
+ public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
+ LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+ DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
+ configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams1.setElectionTimeoutFactor(1);
+ configParams1.setPeerAddressResolver(peerAddressResolver);
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
+ configParams2.setElectionTimeoutFactor(1000000);
+ configParams2.setPeerAddressResolver(peerAddressResolver);
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+ // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
+ // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
+ // node2 was previously voting.
+
+ node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+
+ assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
+ Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
+ LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(3);
+ configParams.setPeerAddressResolver(peerAddressResolver);
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
+ new MockRaftActorContext.MockPayload("2")));
+
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
+ // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
+ // forward the request to node2.
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
+ ImmutableMap.of(node1ID, true, node2ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+ MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
+ }
+
+ @Test
+ public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
+ LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+
+ final String node1ID = "node1";
+ final String node2ID = "node2";
+
+ configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+ peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+ ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+ InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+ InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+ InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+ TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+ CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+ TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+ PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+ CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+ // Wait for snapshot after recovery
+ MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+ // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
+ // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+ // RequestVote messages in node2 and make it the leader so node1 should forward the server change
+ // request to node2 when node2 is elected.
+
+ node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+ ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
+ node2ID, true));
+ node1RaftActorRef.tell(changeServers, testKit.getRef());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
+
+ node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+ MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+ MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ votingServer(node1ID), votingServer(node2ID));
+ assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+ LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
+ }
+
+ private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ for(RaftActor raftActor: raftActors) {
+ if(raftActor.getRaftState() == expState) {
+ return;
+ }
+ }
+ }
+
+ fail("None of the RaftActors have state " + expState);
+ }
+
+ private static ServerInfo votingServer(String id) {