Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.AbstractActor;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.io.ByteSource;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import java.io.OutputStream;
28 import java.time.Duration;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.Set;
33 import java.util.concurrent.TimeUnit;
34 import org.apache.commons.lang3.SerializationUtils;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
45 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
46 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
47 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
48 import org.opendaylight.controller.cluster.raft.messages.AddServer;
49 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
51 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
55 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
58 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
59 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
63 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
64 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
65 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
66 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
67 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import scala.concurrent.duration.FiniteDuration;
74
75 /**
76  * Unit tests for RaftActorServerConfigurationSupport.
77  *
78  * @author Thomas Pantelis
79  */
80 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
81     static final String LEADER_ID = "leader";
82     static final String FOLLOWER_ID = "follower";
83     static final String FOLLOWER_ID2 = "follower2";
84     static final String NEW_SERVER_ID = "new-server";
85     static final String NEW_SERVER_ID2 = "new-server2";
86     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
87     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
88     private static final boolean NO_PERSISTENCE = false;
89     private static final boolean PERSISTENT = true;
90
91     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
92
93     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
94             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
95             actorFactory.generateActorId(FOLLOWER_ID));
96
97     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
98     private ActorRef newFollowerCollectorActor;
99     private RaftActorContext newFollowerActorContext;
100
101     private final TestKit testKit = new TestKit(getSystem());
102
103     @Before
104     public void setup() {
105         InMemoryJournal.clear();
106         InMemorySnapshotStore.clear();
107     }
108
109     @SuppressWarnings("checkstyle:IllegalCatch")
110     private void setupNewFollower() {
111         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
112
113         newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
114                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
115         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
116                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
117                 actorFactory.generateActorId(NEW_SERVER_ID));
118
119         try {
120             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121         } catch (Exception e) {
122             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
123         }
124     }
125
126     private static DefaultConfigParamsImpl newFollowerConfigParams() {
127         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
128         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
129         configParams.setElectionTimeoutFactor(100000);
130         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
131         return configParams;
132     }
133
134     @After
135     public void tearDown() {
136         actorFactory.close();
137     }
138
139     @Test
140     public void testAddServerWithExistingFollower() {
141         LOG.info("testAddServerWithExistingFollower starting");
142         setupNewFollower();
143         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
144         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
145                 0, 3, 1).build());
146         followerActorContext.setCommitIndex(2);
147         followerActorContext.setLastApplied(2);
148
149         Follower follower = new Follower(followerActorContext);
150         followerActor.underlyingActor().setBehavior(follower);
151         followerActorContext.setCurrentBehavior(follower);
152
153         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
154                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
155                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
156                 actorFactory.generateActorId(LEADER_ID));
157
158         // Expect initial heartbeat from the leader.
159         expectFirstMatching(followerActor, AppendEntries.class);
160         clearMessages(followerActor);
161
162         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
163         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
164
165         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
166
167         // Leader should install snapshot - capture and verify ApplySnapshot contents
168
169         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
170         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
171         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
172
173         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
174         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
175         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
176
177         // Verify ServerConfigurationPayload entry in leader's log
178
179         expectFirstMatching(leaderCollectorActor, ApplyState.class);
180         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
181         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
182         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
183         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
184         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
185                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
186
187         // Verify ServerConfigurationPayload entry in both followers
188
189         expectFirstMatching(followerActor, ApplyState.class);
190         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
191         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
192                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
193
194         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
195         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
196         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
197                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
198
199         // Verify new server config was applied in both followers
200
201         assertEquals("Follower peers", Set.of(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
202
203         assertEquals("New follower peers", Set.of(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
204
205         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
206         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
207         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
208         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
209
210         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
211                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
212         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
213                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
214
215         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
216                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
217         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
218                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
219
220         LOG.info("testAddServerWithExistingFollower ending");
221     }
222
223     @Test
224     public void testAddServerWithNoExistingFollower() {
225         LOG.info("testAddServerWithNoExistingFollower starting");
226
227         setupNewFollower();
228         RaftActorContext initialActorContext = new MockRaftActorContext();
229         initialActorContext.setCommitIndex(1);
230         initialActorContext.setLastApplied(1);
231         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
232                 0, 2, 1).build());
233
234         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
235                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
236                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
237                 actorFactory.generateActorId(LEADER_ID));
238
239         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
240         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
241
242         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
243
244         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
245
246         // Leader should install snapshot - capture and verify ApplySnapshot contents
247
248         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
249         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
250         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
251
252         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
253         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
254         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
255
256         // Verify ServerConfigurationPayload entry in leader's log
257
258         expectFirstMatching(leaderCollectorActor, ApplyState.class);
259         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
260         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
261         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
262         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
263                 votingServer(NEW_SERVER_ID));
264
265         // Verify ServerConfigurationPayload entry in the new follower
266
267         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
268         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
269         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
270                 votingServer(NEW_SERVER_ID));
271
272         // Verify new server config was applied in the new follower
273
274         assertEquals("New follower peers", Set.of(LEADER_ID), newFollowerActorContext.getPeerIds());
275
276         LOG.info("testAddServerWithNoExistingFollower ending");
277     }
278
279     @Test
280     public void testAddServersAsNonVoting() {
281         LOG.info("testAddServersAsNonVoting starting");
282
283         setupNewFollower();
284         RaftActorContext initialActorContext = new MockRaftActorContext();
285
286         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
287                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
288                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
289                 actorFactory.generateActorId(LEADER_ID));
290
291         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
292         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
293
294         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
295
296         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
297
298         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
299         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
300         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
301
302         // Verify ServerConfigurationPayload entry in leader's log
303
304         expectFirstMatching(leaderCollectorActor, ApplyState.class);
305
306         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
307         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
308         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
309         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
310                 nonVotingServer(NEW_SERVER_ID));
311
312         // Verify ServerConfigurationPayload entry in the new follower
313
314         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
315         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
316         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
317                 nonVotingServer(NEW_SERVER_ID));
318
319         // Verify new server config was applied in the new follower
320
321         assertEquals("New follower peers", Set.of(LEADER_ID), newFollowerActorContext.getPeerIds());
322
323         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
324
325         // Add another non-voting server.
326
327         clearMessages(leaderCollectorActor);
328
329         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
330         Follower newFollower2 = new Follower(follower2ActorContext);
331         followerActor.underlyingActor().setBehavior(newFollower2);
332
333         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
334
335         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
336         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
337         assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
338
339         expectFirstMatching(leaderCollectorActor, ApplyState.class);
340         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
341         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
342         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
343         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
344                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
345
346         LOG.info("testAddServersAsNonVoting ending");
347     }
348
349     @Test
350     public void testAddServerWithOperationInProgress() {
351         LOG.info("testAddServerWithOperationInProgress starting");
352
353         setupNewFollower();
354         RaftActorContext initialActorContext = new MockRaftActorContext();
355
356         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
357                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
358                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
359                 actorFactory.generateActorId(LEADER_ID));
360
361         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
362         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
363
364         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
365
366         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
367         Follower newFollower2 = new Follower(follower2ActorContext);
368         followerActor.underlyingActor().setBehavior(newFollower2);
369
370         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
371         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
372
373         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
374
375         // Wait for leader's install snapshot and capture it
376
377         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
378
379         // Send a second AddServer - should get queued
380         TestKit testKit2 = new TestKit(getSystem());
381         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
382
383         // Continue the first AddServer
384         newFollowerRaftActorInstance.setDropMessageOfType(null);
385         newFollowerRaftActor.tell(installSnapshot, leaderActor);
386
387         // Verify both complete successfully
388         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
389         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
390
391         addServerReply = testKit2.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
392         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
393
394         // Verify ServerConfigurationPayload entries in leader's log
395
396         expectMatching(leaderCollectorActor, ApplyState.class, 2);
397         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
398         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
399         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
400         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
401                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
402
403         // Verify ServerConfigurationPayload entry in the new follower
404
405         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
406         assertEquals("New follower peers", Set.of(LEADER_ID, NEW_SERVER_ID2), newFollowerActorContext.getPeerIds());
407
408         LOG.info("testAddServerWithOperationInProgress ending");
409     }
410
411     @Test
412     public void testAddServerWithPriorSnapshotInProgress() {
413         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
414
415         setupNewFollower();
416         RaftActorContext initialActorContext = new MockRaftActorContext();
417
418         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
419                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
420                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
421                 actorFactory.generateActorId(LEADER_ID));
422
423         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
424         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
425
426         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
427
428         // Drop commit message for now to delay snapshot completion
429         leaderRaftActor.setDropMessageOfType(String.class);
430
431         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
432
433         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
434
435         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
436
437         leaderRaftActor.setDropMessageOfType(null);
438         leaderActor.tell(commitMsg, leaderActor);
439
440         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
441         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
442         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
443
444         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
445
446         // Verify ServerConfigurationPayload entry in leader's log
447
448         expectFirstMatching(leaderCollectorActor, ApplyState.class);
449         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
450         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
451         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
452         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
453                 votingServer(NEW_SERVER_ID));
454
455         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
456     }
457
458     @Test
459     public void testAddServerWithPriorSnapshotCompleteTimeout() {
460         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
461
462         setupNewFollower();
463         RaftActorContext initialActorContext = new MockRaftActorContext();
464
465         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
466                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
467                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
468                 actorFactory.generateActorId(LEADER_ID));
469
470         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
471         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
472
473         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
474
475         // Drop commit message so the snapshot doesn't complete.
476         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
477
478         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
479
480         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
481
482         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
483         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
484
485         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
486
487         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
488     }
489
490     @Test
491     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
492         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
493
494         setupNewFollower();
495         RaftActorContext initialActorContext = new MockRaftActorContext();
496
497         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
498                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
499                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
500                 actorFactory.generateActorId(LEADER_ID));
501
502         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
503         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
504         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
505
506         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
507
508         // Drop the commit message so the snapshot doesn't complete yet.
509         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
510
511         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
512
513         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
514
515         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
516
517         // Change the leader behavior to follower
518         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
519
520         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
521         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
522         // isCapturing assertion below.
523         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
524
525         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
526         leaderActor.tell(commitMsg, leaderActor);
527
528         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
529
530         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
531         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
532
533         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
534         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
535
536         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
537     }
538
539     @Test
540     public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
541         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
542
543         setupNewFollower();
544         RaftActorContext initialActorContext = new MockRaftActorContext();
545
546         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
547                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
548                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
549                 actorFactory.generateActorId(LEADER_ID));
550
551         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
552         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
553
554         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
555
556         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
557
558         // Drop the UnInitializedFollowerSnapshotReply to delay it.
559         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
560
561         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
562
563         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
564                 UnInitializedFollowerSnapshotReply.class);
565
566         // Prevent election timeout when the leader switches to follower
567         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
568
569         // Change the leader behavior to follower
570         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
571
572         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
573         leaderRaftActor.setDropMessageOfType(null);
574         leaderActor.tell(snapshotReply, leaderActor);
575
576         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
577         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
578
579         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
580
581         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
582     }
583
584     @Test
585     public void testAddServerWithInstallSnapshotTimeout() {
586         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
587
588         setupNewFollower();
589         RaftActorContext initialActorContext = new MockRaftActorContext();
590
591         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
592                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
593                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
594                 actorFactory.generateActorId(LEADER_ID));
595
596         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
597         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
598         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
599
600         // Drop the InstallSnapshot message so it times out
601         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
602
603         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
604
605         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
606
607         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
608         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
609
610         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
611         assertEquals("Leader followers size", 0,
612                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
613
614         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
615     }
616
617     @Test
618     public void testAddServerWithNoLeader() {
619         LOG.info("testAddServerWithNoLeader starting");
620
621         setupNewFollower();
622         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
623         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
624
625         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
626                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
627                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
628                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
629                 actorFactory.generateActorId(LEADER_ID));
630         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
631
632         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
633                 testKit.getRef());
634         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
635         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
636
637         LOG.info("testAddServerWithNoLeader ending");
638     }
639
640     @Test
641     public void testAddServerWithNoConsensusReached() {
642         LOG.info("testAddServerWithNoConsensusReached starting");
643
644         setupNewFollower();
645         RaftActorContext initialActorContext = new MockRaftActorContext();
646
647         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
648                 MockLeaderRaftActor.props(Map.of(), initialActorContext)
649                     .withDispatcher(Dispatchers.DefaultDispatcherId()),
650                 actorFactory.generateActorId(LEADER_ID));
651
652         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
653         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
654
655         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
656
657         // Drop UnInitializedFollowerSnapshotReply initially
658         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
659
660         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
661         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
662
663         // Drop AppendEntries to the new follower so consensus isn't reached
664         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
665
666         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
667
668         // Capture the UnInitializedFollowerSnapshotReply
669         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
670
671         // Send the UnInitializedFollowerSnapshotReply to resume the first request
672         leaderRaftActor.setDropMessageOfType(null);
673         leaderActor.tell(snapshotReply, leaderActor);
674
675         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
676
677         // Send a second AddServer
678         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
679
680         // The first AddServer should succeed with OK even though consensus wasn't reached
681         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
682         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
683         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().orElseThrow());
684
685         // Verify ServerConfigurationPayload entry in leader's log
686         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
687                 votingServer(NEW_SERVER_ID));
688
689         // The second AddServer should fail since consensus wasn't reached for the first
690         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
691         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
692
693         // Re-send the second AddServer - should also fail
694         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
695         addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
696         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
697
698         LOG.info("testAddServerWithNoConsensusReached ending");
699     }
700
701     @Test
702     public void testAddServerWithExistingServer() {
703         LOG.info("testAddServerWithExistingServer starting");
704
705         RaftActorContext initialActorContext = new MockRaftActorContext();
706
707         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
708                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
709                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
710                 actorFactory.generateActorId(LEADER_ID));
711
712         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
713
714         AddServerReply addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
715         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
716
717         LOG.info("testAddServerWithExistingServer ending");
718     }
719
720     @Test
721     public void testAddServerForwardedToLeader() {
722         LOG.info("testAddServerForwardedToLeader starting");
723
724         setupNewFollower();
725         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
726         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
727
728         ActorRef leaderActor = actorFactory.createActor(
729                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
730
731         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
732                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(Map.of(LEADER_ID,
733                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
734                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
735                 actorFactory.generateActorId(FOLLOWER_ID));
736         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
737
738         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, List.of(), -1, -1, (short)0), leaderActor);
739
740         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
741                 testKit.getRef());
742         expectFirstMatching(leaderActor, AddServer.class);
743
744         LOG.info("testAddServerForwardedToLeader ending");
745     }
746
747     @Test
748     public void testOnApplyState() {
749         LOG.info("testOnApplyState starting");
750
751         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
753         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
754                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
755                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
756                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
757                 actorFactory.generateActorId(LEADER_ID));
758
759         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
760                 noLeaderActor.underlyingActor());
761
762         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
763                 new ServerConfigurationPayload(List.of()));
764         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
765         assertEquals("Message handled", true, handled);
766
767         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
768                 new MockRaftActorContext.MockPayload("1"));
769         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
770         assertEquals("Message handled", false, handled);
771
772         LOG.info("testOnApplyState ending");
773     }
774
775     @Test
776     public void testRemoveServerWithNoLeader() {
777         LOG.info("testRemoveServerWithNoLeader starting");
778
779         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
780         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
781
782         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
783                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(Map.of(FOLLOWER_ID,
784                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
785                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
786                 actorFactory.generateActorId(LEADER_ID));
787         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
788
789         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
790         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
791         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
792
793         LOG.info("testRemoveServerWithNoLeader ending");
794     }
795
796     @Test
797     public void testRemoveServerNonExistentServer() {
798         LOG.info("testRemoveServerNonExistentServer starting");
799
800         RaftActorContext initialActorContext = new MockRaftActorContext();
801
802         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
803                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActor.path().toString()),
804                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
805                 actorFactory.generateActorId(LEADER_ID));
806
807         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
808         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
809         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
810
811         LOG.info("testRemoveServerNonExistentServer ending");
812     }
813
814     @Test
815     public void testRemoveServerForwardToLeader() {
816         LOG.info("testRemoveServerForwardToLeader starting");
817
818         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
819         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
820
821         ActorRef leaderActor = actorFactory.createTestActor(
822                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
823
824         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
825                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(Map.of(LEADER_ID,
826                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
827                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
828                 actorFactory.generateActorId(FOLLOWER_ID));
829         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
830
831         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, List.of(), -1, -1, (short)0), leaderActor);
832
833         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
834         expectFirstMatching(leaderActor, RemoveServer.class);
835
836         LOG.info("testRemoveServerForwardToLeader ending");
837     }
838
839     @Test
840     public void testRemoveServer() {
841         LOG.info("testRemoveServer starting");
842
843         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
844         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
845         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
846
847         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
848         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
849         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
850         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
851         RaftActorContext initialActorContext = new MockRaftActorContext();
852
853         final String downNodeId = "downNode";
854         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
855                 Map.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
856                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
857                 actorFactory.generateActorId(LEADER_ID));
858
859         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
860
861         ActorRef follower1Collector = actorFactory.createActor(
862                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
863         final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
864                 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
865                         FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
866                         follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
867
868         ActorRef follower2Collector = actorFactory.createActor(
869                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
870         final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
871                 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
872                         FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
873                         follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
874
875         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
876         follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
877         follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
878
879         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
880         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
881         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
882
883         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
884         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
885         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
886                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
887
888         applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
889         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
890         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
891                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
892
893         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
894         assertTrue("Expected Leader", currentBehavior instanceof Leader);
895         assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
896
897         MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
898
899         LOG.info("testRemoveServer ending");
900     }
901
902     @Test
903     public void testRemoveServerLeader() {
904         LOG.info("testRemoveServerLeader starting");
905
906         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
907         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
908         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
909
910         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
911         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
912         RaftActorContext initialActorContext = new MockRaftActorContext();
913
914         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
915                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, followerActorPath),
916                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
917                 actorFactory.generateActorId(LEADER_ID));
918
919         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
920
921         final ActorRef followerCollector =
922                 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
923         actorFactory.createTestActor(
924                 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString()),
925                         configParams, NO_PERSISTENCE, followerCollector)
926                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
927                 followerActorId);
928
929         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
930         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
931         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
932
933         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
934         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
935         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
936                 votingServer(FOLLOWER_ID));
937
938         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
939
940         LOG.info("testRemoveServerLeader ending");
941     }
942
943     @Test
944     public void testRemoveServerLeaderWithNoFollowers() {
945         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
946
947         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
948                 MockLeaderRaftActor.props(Map.of(),
949                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
950                 actorFactory.generateActorId(LEADER_ID));
951
952         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
953         RemoveServerReply removeServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), RemoveServerReply.class);
954         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
955
956         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
957     }
958
959     @Test
960     public void testChangeServersVotingStatus() {
961         LOG.info("testChangeServersVotingStatus starting");
962
963         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
964         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
965         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
966
967         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
968         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
969         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
970         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
971
972         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
973                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, follower1ActorPath,
974                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
975                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
976         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
977
978         ActorRef follower1Collector = actorFactory.createActor(
979                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
980         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
981                 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
982                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
983                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
984
985         ActorRef follower2Collector = actorFactory.createActor(
986                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
987         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
988                 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
989                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
990                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
991
992         // Send first ChangeServersVotingStatus message
993
994         leaderActor.tell(new ChangeServersVotingStatus(Map.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
995                 testKit.getRef());
996         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
997         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
998
999         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1000         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1001         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1003
1004         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1005         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1006                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1007                 nonVotingServer(FOLLOWER_ID2));
1008
1009         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1010         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1011                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1012                 nonVotingServer(FOLLOWER_ID2));
1013
1014         MessageCollectorActor.clearMessages(leaderCollector);
1015         MessageCollectorActor.clearMessages(follower1Collector);
1016         MessageCollectorActor.clearMessages(follower2Collector);
1017
1018         // Send second ChangeServersVotingStatus message
1019
1020         leaderActor.tell(new ChangeServersVotingStatus(Map.of(FOLLOWER_ID, true)), testKit.getRef());
1021         reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1022         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1023
1024         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1025         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1026                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1027
1028         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1029         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1030                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1031
1032         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1033         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1034                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1035
1036         LOG.info("testChangeServersVotingStatus ending");
1037     }
1038
1039     @Test
1040     public void testChangeLeaderToNonVoting() {
1041         LOG.info("testChangeLeaderToNonVoting starting");
1042
1043         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1045
1046         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1047         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1048         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1049         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1050
1051         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1052                 MockLeaderRaftActor.props(Map.of(FOLLOWER_ID, follower1ActorPath,
1053                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1054                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1055         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1056
1057         ActorRef follower1Collector = actorFactory.createActor(
1058                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1059         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1060                 CollectingMockRaftActor.props(FOLLOWER_ID, Map.of(LEADER_ID, leaderActor.path().toString(),
1061                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1062                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1063
1064         ActorRef follower2Collector = actorFactory.createActor(
1065                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1066         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1067                 CollectingMockRaftActor.props(FOLLOWER_ID2, Map.of(LEADER_ID, leaderActor.path().toString(),
1068                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1069                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1070
1071         // Send ChangeServersVotingStatus message
1072
1073         leaderActor.tell(new ChangeServersVotingStatus(Map.of(LEADER_ID, false)), testKit.getRef());
1074         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1075         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1076
1077         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1078         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1079                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1080
1081         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1082         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1083                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1084
1085         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1086         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1087                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1088
1089         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1090         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1091
1092         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1093
1094         LOG.info("testChangeLeaderToNonVoting ending");
1095     }
1096
1097     @Test
1098     public void testChangeLeaderToNonVotingInSingleNode() {
1099         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1100
1101         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1102                 MockLeaderRaftActor.props(Map.of(), new MockRaftActorContext())
1103                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1104
1105         leaderActor.tell(new ChangeServersVotingStatus(Map.of(LEADER_ID, false)), testKit.getRef());
1106         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1107         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1108
1109         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1110     }
1111
1112     @Test
1113     public void testChangeToVotingWithNoLeader() {
1114         LOG.info("testChangeToVotingWithNoLeader starting");
1115
1116         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1117         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1118         configParams.setElectionTimeoutFactor(5);
1119
1120         final String node1ID = "node1";
1121         final String node2ID = "node2";
1122
1123         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1124         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1125         // actors created).
1126
1127         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1128                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1129                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1130         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1131
1132         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1133         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1134         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1135         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1136         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1137         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1138
1139         ActorRef node1Collector = actorFactory.createActor(
1140                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1141         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1142                 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1143                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1144         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1145
1146         ActorRef node2Collector = actorFactory.createActor(
1147                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1148         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1149                 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1150                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1151         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1152
1153         node1RaftActor.waitForInitializeBehaviorComplete();
1154         node2RaftActor.waitForInitializeBehaviorComplete();
1155
1156         // Verify the intended server config was loaded and applied.
1157         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1158                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1159                 votingServer("downNode2"));
1160         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1161         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1162         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1163
1164         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1165                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1166                 votingServer("downNode2"));
1167         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1168
1169         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1170         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1171         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1172         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1173
1174         // First send the message such that node1 has no peer address for node2 - should fail.
1175
1176         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true,
1177                 node2ID, true, "downNode1", false, "downNode2", false));
1178         node1RaftActorRef.tell(changeServers, testKit.getRef());
1179         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1180         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1181         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1182
1183         // Send an AppendEntries so node1 has a leaderId
1184
1185         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1186         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1187                 List.of(), 0, -1, (short)1), ActorRef.noSender());
1188
1189         // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1190         // ChangeServersVotingStatus message, it will try to elect a leader.
1191
1192         AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1193             rs -> assertEquals("getLeader", null, rs.getLeader()));
1194
1195         // Update node2's peer address and send the message again
1196
1197         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1198
1199         node1RaftActorRef.tell(changeServers, testKit.getRef());
1200         reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1201         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1202
1203         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1204                 ApplyJournalEntries.class);
1205         assertEquals("getToIndex", 1, apply.getToIndex());
1206         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1207                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1208                 nonVotingServer("downNode2"));
1209         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1210         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1211
1212         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1213         assertEquals("getToIndex", 1, apply.getToIndex());
1214         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1215                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1216                 nonVotingServer("downNode2"));
1217         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1218         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1219
1220         LOG.info("testChangeToVotingWithNoLeader ending");
1221     }
1222
1223     @Test
1224     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1225         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1226
1227         final String node1ID = "node1";
1228         final String node2ID = "node2";
1229
1230         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1231                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1232                         ? actorFactory.createTestActorPath(node2ID) : null;
1233
1234         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1235                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1236         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1237
1238         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1239         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1240         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1241         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1242
1243         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1244         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1245         configParams1.setElectionTimeoutFactor(1);
1246         configParams1.setPeerAddressResolver(peerAddressResolver);
1247         ActorRef node1Collector = actorFactory.createActor(
1248                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1249         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1250                 CollectingMockRaftActor.props(node1ID, Map.of(), configParams1,
1251                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1252         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1253
1254         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1255         configParams2.setElectionTimeoutFactor(1000000);
1256         configParams2.setPeerAddressResolver(peerAddressResolver);
1257         ActorRef node2Collector = actorFactory.createActor(
1258                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1259         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1260                 CollectingMockRaftActor.props(node2ID, Map.of(), configParams2,
1261                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1262         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1263
1264         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1265         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1266         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1267         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1268         // node2 was previously voting.
1269
1270         node2RaftActor.setDropMessageOfType(RequestVote.class);
1271
1272         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true));
1273         node1RaftActorRef.tell(changeServers, testKit.getRef());
1274         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1275         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1276
1277         assertEquals("Server config", Set.of(nonVotingServer(node1ID), votingServer(node2ID)),
1278             Set.copyOf(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1279         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1280
1281         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1282     }
1283
1284     @Test
1285     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1286         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1287
1288         final String node1ID = "node1";
1289         final String node2ID = "node2";
1290
1291         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1292                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1293                         ? actorFactory.createTestActorPath(node2ID) : null;
1294
1295         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1296         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1297         configParams.setElectionTimeoutFactor(3);
1298         configParams.setPeerAddressResolver(peerAddressResolver);
1299
1300         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1301                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1302         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1303
1304         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1305         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1306         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1307         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1308         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1309                 new MockRaftActorContext.MockPayload("2")));
1310         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1311
1312         ActorRef node1Collector = actorFactory.createActor(
1313                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1314         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1315                 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1316                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1317         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1318
1319         ActorRef node2Collector = actorFactory.createActor(
1320                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1321         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1322                 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1323                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1324         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1325
1326         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1327         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1328         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1329         // forward the request to node2.
1330
1331         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1332                 Map.of(node1ID, true, node2ID, true));
1333         node1RaftActorRef.tell(changeServers, testKit.getRef());
1334         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1335         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1336
1337         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1338         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1339                 votingServer(node1ID), votingServer(node2ID));
1340         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1341
1342         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1343         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1344                 votingServer(node1ID), votingServer(node2ID));
1345         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1346         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1347
1348         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1349     }
1350
1351     @Test
1352     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1353         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1354
1355         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1356         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1357         configParams.setElectionTimeoutFactor(100000);
1358
1359         final String node1ID = "node1";
1360         final String node2ID = "node2";
1361
1362         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1363                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1364                         ? actorFactory.createTestActorPath(node2ID) : null);
1365
1366         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
1367                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1368         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1369
1370         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1371         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1372         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1373         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1374
1375         ActorRef node1Collector = actorFactory.createActor(
1376                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1377         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1378                 CollectingMockRaftActor.props(node1ID, Map.of(), configParams,
1379                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1380         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1381
1382         ActorRef node2Collector = actorFactory.createActor(
1383                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1384         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1385                 CollectingMockRaftActor.props(node2ID, Map.of(), configParams,
1386                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1387         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1388
1389         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1390         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1391         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1392         // request to node2 when node2 is elected.
1393
1394         node2RaftActor.setDropMessageOfType(RequestVote.class);
1395
1396         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true,
1397                 node2ID, true));
1398         node1RaftActorRef.tell(changeServers, testKit.getRef());
1399
1400         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1401
1402         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1403
1404         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
1405         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1406
1407         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1408         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1409                 votingServer(node1ID), votingServer(node2ID));
1410         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1411         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1412
1413         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1414         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1415                 votingServer(node1ID), votingServer(node2ID));
1416         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1417
1418         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1419     }
1420
1421     private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
1422         Stopwatch sw = Stopwatch.createStarted();
1423         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1424             for (RaftActor raftActor : raftActors) {
1425                 if (raftActor.getRaftState() == expState) {
1426                     return;
1427                 }
1428             }
1429         }
1430
1431         fail("None of the RaftActors have state " + expState);
1432     }
1433
1434     private static ServerInfo votingServer(final String id) {
1435         return new ServerInfo(id, true);
1436     }
1437
1438     private static ServerInfo nonVotingServer(final String id) {
1439         return new ServerInfo(id, false);
1440     }
1441
1442     private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
1443         return newCollectorActor(leaderRaftActor, LEADER_ID);
1444     }
1445
1446     private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
1447         ActorRef collectorActor = actorFactory.createTestActor(
1448                 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1449         raftActor.setCollectorActor(collectorActor);
1450         return collectorActor;
1451     }
1452
1453     private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
1454         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1455         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1456         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1457         assertEquals("Server config", Set.of(expected), Set.copyOf(payload.getServerConfig()));
1458     }
1459
1460     private static RaftActorContextImpl newFollowerContext(final String id,
1461             final TestActorRef<? extends AbstractActor> actor) {
1462         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1463         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1464         configParams.setElectionTimeoutFactor(100000);
1465         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1466         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1467         termInfo.update(1, LEADER_ID);
1468         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1469                 id, termInfo, -1, -1, Map.of(LEADER_ID, ""), configParams,
1470                 noPersistence, applyState -> actor.tell(applyState, actor), LOG,  MoreExecutors.directExecutor());
1471     }
1472
1473     abstract static class AbstractMockRaftActor extends MockRaftActor {
1474         private volatile ActorRef collectorActor;
1475         private volatile Class<?> dropMessageOfType;
1476
1477         AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
1478                 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1479             super(builder().id(id).peerAddresses(peerAddresses).config(config.orElseThrow())
1480                     .persistent(Optional.of(persistent)));
1481             this.collectorActor = collectorActor;
1482         }
1483
1484         void setDropMessageOfType(final Class<?> dropMessageOfType) {
1485             this.dropMessageOfType = dropMessageOfType;
1486         }
1487
1488         void setCollectorActor(final ActorRef collectorActor) {
1489             this.collectorActor = collectorActor;
1490         }
1491
1492         @Override
1493         public void handleCommand(final Object message) {
1494             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1495                 super.handleCommand(message);
1496             }
1497
1498             if (collectorActor != null) {
1499                 collectorActor.tell(message, getSender());
1500             }
1501         }
1502     }
1503
1504     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1505
1506         CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
1507                 final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
1508             super(id, peerAddresses, config, persistent, collectorActor);
1509             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1510                 @Override
1511                 public void createSnapshot(final ActorRef actorRef,
1512                         final Optional<OutputStream> installSnapshotStream) {
1513                     actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1514                 }
1515
1516                 @Override
1517                 public void applySnapshot(
1518                         final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1519                 }
1520
1521                 @Override
1522                 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1523                         final ByteSource snapshotBytes) {
1524                     throw new UnsupportedOperationException();
1525                 }
1526             };
1527         }
1528
1529         public static Props props(final String id, final Map<String, String> peerAddresses,
1530                 final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
1531
1532             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1533                     persistent, collectorActor);
1534         }
1535
1536     }
1537
1538     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1539         public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
1540                 final RaftActorContext fromContext) {
1541             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1542             setPersistence(false);
1543
1544             RaftActorContext context = getRaftActorContext();
1545             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1546                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1547                 getState().add(entry.getData());
1548                 context.getReplicatedLog().append(entry);
1549             }
1550
1551             context.setCommitIndex(fromContext.getCommitIndex());
1552             context.setLastApplied(fromContext.getLastApplied());
1553             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1554                     fromContext.getTermInformation().getVotedFor());
1555         }
1556
1557         @Override
1558         protected void initializeBehavior() {
1559             changeCurrentBehavior(new Leader(getRaftActorContext()));
1560             initializeBehaviorComplete.countDown();
1561         }
1562
1563         @Override
1564         @SuppressWarnings("checkstyle:IllegalCatch")
1565         public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1566             MockSnapshotState snapshotState = new MockSnapshotState(List.copyOf(getState()));
1567             if (installSnapshotStream.isPresent()) {
1568                 SerializationUtils.serialize(snapshotState, installSnapshotStream.orElseThrow());
1569             }
1570
1571             actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1572         }
1573
1574         static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
1575             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1576             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1577             configParams.setElectionTimeoutFactor(10);
1578             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1579         }
1580     }
1581
1582     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1583         public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
1584             super(NEW_SERVER_ID, Map.of(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1585             setPersistence(false);
1586         }
1587
1588         static Props props(final ConfigParams config, final ActorRef collectorActor) {
1589             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1590         }
1591     }
1592 }