b1231d62d345b3477df95b4837da3c8ce2bcb46f
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.ActorRef;
19 import akka.actor.Props;
20 import akka.actor.UntypedActor;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteSource;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import org.apache.commons.lang3.SerializationUtils;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
48 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
49 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
50 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
51 import org.opendaylight.controller.cluster.raft.messages.AddServer;
52 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
53 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
54 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
55 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
57 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
58 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
59 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
60 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
61 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
62 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
69 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
70 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import scala.concurrent.duration.FiniteDuration;
77
78 /**
79  * Unit tests for RaftActorServerConfigurationSupport.
80  *
81  * @author Thomas Pantelis
82  */
83 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
84     static final String LEADER_ID = "leader";
85     static final String FOLLOWER_ID = "follower";
86     static final String FOLLOWER_ID2 = "follower2";
87     static final String NEW_SERVER_ID = "new-server";
88     static final String NEW_SERVER_ID2 = "new-server2";
89     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
90     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
91     private static final boolean NO_PERSISTENCE = false;
92     private static final boolean PERSISTENT = true;
93
94     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
95
96     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
97             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
98             actorFactory.generateActorId(FOLLOWER_ID));
99
100     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
101     private ActorRef newFollowerCollectorActor;
102     private RaftActorContext newFollowerActorContext;
103
104     private final TestKit testKit = new TestKit(getSystem());
105
106     @Before
107     public void setup() {
108         InMemoryJournal.clear();
109         InMemorySnapshotStore.clear();
110     }
111
112     @SuppressWarnings("checkstyle:IllegalCatch")
113     private void setupNewFollower() {
114         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
115
116         newFollowerCollectorActor = actorFactory.createActor(MessageCollectorActor.props(),
117                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
118         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
119                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
120                 actorFactory.generateActorId(NEW_SERVER_ID));
121
122         try {
123             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
124         } catch (Exception e) {
125             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
126         }
127     }
128
129     private static DefaultConfigParamsImpl newFollowerConfigParams() {
130         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
131         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
132         configParams.setElectionTimeoutFactor(100000);
133         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
134         return configParams;
135     }
136
137     @After
138     public void tearDown() {
139         actorFactory.close();
140     }
141
142     @Test
143     public void testAddServerWithExistingFollower() {
144         LOG.info("testAddServerWithExistingFollower starting");
145         setupNewFollower();
146         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
147         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
148                 0, 3, 1).build());
149         followerActorContext.setCommitIndex(2);
150         followerActorContext.setLastApplied(2);
151
152         Follower follower = new Follower(followerActorContext);
153         followerActor.underlyingActor().setBehavior(follower);
154         followerActorContext.setCurrentBehavior(follower);
155
156         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
157                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
158                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
159                 actorFactory.generateActorId(LEADER_ID));
160
161         // Expect initial heartbeat from the leader.
162         expectFirstMatching(followerActor, AppendEntries.class);
163         clearMessages(followerActor);
164
165         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
166         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
167
168         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
169
170         // Leader should install snapshot - capture and verify ApplySnapshot contents
171
172         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
173         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
174         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
175
176         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
177         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
178         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
179
180         // Verify ServerConfigurationPayload entry in leader's log
181
182         expectFirstMatching(leaderCollectorActor, ApplyState.class);
183         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
184         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
185         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
186         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
187         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
188                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
189
190         // Verify ServerConfigurationPayload entry in both followers
191
192         expectFirstMatching(followerActor, ApplyState.class);
193         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
194         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
195                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
196
197         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
198         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
199         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
200                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
201
202         // Verify new server config was applied in both followers
203
204         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
205
206         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
207                 newFollowerActorContext.getPeerIds());
208
209         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
210         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
211         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
212         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
213
214         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
215                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
216         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
217                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
218
219         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
220                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
221         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
222                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
223
224         LOG.info("testAddServerWithExistingFollower ending");
225     }
226
227     @Test
228     public void testAddServerWithNoExistingFollower() {
229         LOG.info("testAddServerWithNoExistingFollower starting");
230
231         setupNewFollower();
232         RaftActorContext initialActorContext = new MockRaftActorContext();
233         initialActorContext.setCommitIndex(1);
234         initialActorContext.setLastApplied(1);
235         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
236                 0, 2, 1).build());
237
238         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
239                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
240                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
241                 actorFactory.generateActorId(LEADER_ID));
242
243         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
244         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
245
246         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
247
248         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
249
250         // Leader should install snapshot - capture and verify ApplySnapshot contents
251
252         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
253         List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
254         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
255
256         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
257         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
258         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
259
260         // Verify ServerConfigurationPayload entry in leader's log
261
262         expectFirstMatching(leaderCollectorActor, ApplyState.class);
263         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
264         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
265         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
266         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
267                 votingServer(NEW_SERVER_ID));
268
269         // Verify ServerConfigurationPayload entry in the new follower
270
271         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
272         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
273         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
274                 votingServer(NEW_SERVER_ID));
275
276         // Verify new server config was applied in the new follower
277
278         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
279
280         LOG.info("testAddServerWithNoExistingFollower ending");
281     }
282
283     @Test
284     public void testAddServersAsNonVoting() {
285         LOG.info("testAddServersAsNonVoting starting");
286
287         setupNewFollower();
288         RaftActorContext initialActorContext = new MockRaftActorContext();
289
290         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
291                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
292                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
293                 actorFactory.generateActorId(LEADER_ID));
294
295         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
296         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
297
298         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
299
300         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
301
302         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
303         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
304         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
305
306         // Verify ServerConfigurationPayload entry in leader's log
307
308         expectFirstMatching(leaderCollectorActor, ApplyState.class);
309
310         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
311         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
312         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
313         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
314                 nonVotingServer(NEW_SERVER_ID));
315
316         // Verify ServerConfigurationPayload entry in the new follower
317
318         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
319         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
320         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
321                 nonVotingServer(NEW_SERVER_ID));
322
323         // Verify new server config was applied in the new follower
324
325         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
326
327         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
328
329         // Add another non-voting server.
330
331         clearMessages(leaderCollectorActor);
332
333         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
334         Follower newFollower2 = new Follower(follower2ActorContext);
335         followerActor.underlyingActor().setBehavior(newFollower2);
336
337         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
338
339         addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
340         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
341         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
342
343         expectFirstMatching(leaderCollectorActor, ApplyState.class);
344         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
345         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
346         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
347         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
348                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
349
350         LOG.info("testAddServersAsNonVoting ending");
351     }
352
353     @Test
354     public void testAddServerWithOperationInProgress() {
355         LOG.info("testAddServerWithOperationInProgress starting");
356
357         setupNewFollower();
358         RaftActorContext initialActorContext = new MockRaftActorContext();
359
360         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
361                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
362                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
363                 actorFactory.generateActorId(LEADER_ID));
364
365         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
366         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
367
368         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
369
370         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
371         Follower newFollower2 = new Follower(follower2ActorContext);
372         followerActor.underlyingActor().setBehavior(newFollower2);
373
374         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
375         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
376
377         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
378
379         // Wait for leader's install snapshot and capture it
380
381         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
382
383         // Send a second AddServer - should get queued
384         TestKit testKit2 = new TestKit(getSystem());
385         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
386
387         // Continue the first AddServer
388         newFollowerRaftActorInstance.setDropMessageOfType(null);
389         newFollowerRaftActor.tell(installSnapshot, leaderActor);
390
391         // Verify both complete successfully
392         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
393         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
394
395         addServerReply = testKit2.expectMsgClass(testKit2.duration("5 seconds"), AddServerReply.class);
396         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
397
398         // Verify ServerConfigurationPayload entries in leader's log
399
400         expectMatching(leaderCollectorActor, ApplyState.class, 2);
401         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
402         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
403         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
404         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
405                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
406
407         // Verify ServerConfigurationPayload entry in the new follower
408
409         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
410         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
411                newFollowerActorContext.getPeerIds());
412
413         LOG.info("testAddServerWithOperationInProgress ending");
414     }
415
416     @Test
417     public void testAddServerWithPriorSnapshotInProgress() {
418         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
419
420         setupNewFollower();
421         RaftActorContext initialActorContext = new MockRaftActorContext();
422
423         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
424                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
425                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
426                 actorFactory.generateActorId(LEADER_ID));
427
428         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
429         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
430
431         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
432
433         // Drop commit message for now to delay snapshot completion
434         leaderRaftActor.setDropMessageOfType(String.class);
435
436         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
437
438         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
439
440         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
441
442         leaderRaftActor.setDropMessageOfType(null);
443         leaderActor.tell(commitMsg, leaderActor);
444
445         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
446         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
447         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
448
449         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
450
451         // Verify ServerConfigurationPayload entry in leader's log
452
453         expectFirstMatching(leaderCollectorActor, ApplyState.class);
454         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
455         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
456         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
457         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
458                 votingServer(NEW_SERVER_ID));
459
460         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
461     }
462
463     @Test
464     public void testAddServerWithPriorSnapshotCompleteTimeout() {
465         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
466
467         setupNewFollower();
468         RaftActorContext initialActorContext = new MockRaftActorContext();
469
470         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
471                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
472                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
473                 actorFactory.generateActorId(LEADER_ID));
474
475         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
476         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
477
478         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
479
480         // Drop commit message so the snapshot doesn't complete.
481         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
482
483         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
484
485         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
486
487         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
488         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
489
490         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
491
492         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
493     }
494
495     @Test
496     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() {
497         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
498
499         setupNewFollower();
500         RaftActorContext initialActorContext = new MockRaftActorContext();
501
502         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
503                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
504                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
505                 actorFactory.generateActorId(LEADER_ID));
506
507         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
508         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
509         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
510
511         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
512
513         // Drop the commit message so the snapshot doesn't complete yet.
514         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
515
516         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
517
518         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
519
520         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
521
522         // Change the leader behavior to follower
523         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
524
525         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
526         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
527         // isCapturing assertion below.
528         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
529
530         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
531         leaderActor.tell(commitMsg, leaderActor);
532
533         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
534
535         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
536         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
537
538         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
539         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
540
541         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
542     }
543
544     @Test
545     public void testAddServerWithLeaderChangeDuringInstallSnapshot() {
546         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
547
548         setupNewFollower();
549         RaftActorContext initialActorContext = new MockRaftActorContext();
550
551         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554                 actorFactory.generateActorId(LEADER_ID));
555
556         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
558
559         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
560
561         ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
562
563         // Drop the UnInitializedFollowerSnapshotReply to delay it.
564         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
565
566         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
567
568         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
569                 UnInitializedFollowerSnapshotReply.class);
570
571         // Prevent election timeout when the leader switches to follower
572         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
573
574         // Change the leader behavior to follower
575         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
576
577         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
578         leaderRaftActor.setDropMessageOfType(null);
579         leaderActor.tell(snapshotReply, leaderActor);
580
581         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
582         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
583
584         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
585
586         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
587     }
588
589     @Test
590     public void testAddServerWithInstallSnapshotTimeout() {
591         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
592
593         setupNewFollower();
594         RaftActorContext initialActorContext = new MockRaftActorContext();
595
596         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
597                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
598                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
599                 actorFactory.generateActorId(LEADER_ID));
600
601         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
602         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
603         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
604
605         // Drop the InstallSnapshot message so it times out
606         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
607
608         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
609
610         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
611
612         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
613         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
614
615         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
616         assertEquals("Leader followers size", 0,
617                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
618
619         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
620     }
621
622     @Test
623     public void testAddServerWithNoLeader() {
624         LOG.info("testAddServerWithNoLeader starting");
625
626         setupNewFollower();
627         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
628         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
629
630         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
631                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
632                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
633                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
634                 actorFactory.generateActorId(LEADER_ID));
635         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
636
637         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
638                 testKit.getRef());
639         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
640         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
641
642         LOG.info("testAddServerWithNoLeader ending");
643     }
644
645     @Test
646     public void testAddServerWithNoConsensusReached() {
647         LOG.info("testAddServerWithNoConsensusReached starting");
648
649         setupNewFollower();
650         RaftActorContext initialActorContext = new MockRaftActorContext();
651
652         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
653                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
654                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
655                 actorFactory.generateActorId(LEADER_ID));
656
657         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
658         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
659
660         final ActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
661
662         // Drop UnInitializedFollowerSnapshotReply initially
663         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
664
665         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
666         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
667
668         // Drop AppendEntries to the new follower so consensus isn't reached
669         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
670
671         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
672
673         // Capture the UnInitializedFollowerSnapshotReply
674         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
675
676         // Send the UnInitializedFollowerSnapshotReply to resume the first request
677         leaderRaftActor.setDropMessageOfType(null);
678         leaderActor.tell(snapshotReply, leaderActor);
679
680         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
681
682         // Send a second AddServer
683         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
684
685         // The first AddServer should succeed with OK even though consensus wasn't reached
686         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
687         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
688         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
689
690         // Verify ServerConfigurationPayload entry in leader's log
691         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
692                 votingServer(NEW_SERVER_ID));
693
694         // The second AddServer should fail since consensus wasn't reached for the first
695         addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
696         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
697
698         // Re-send the second AddServer - should also fail
699         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
700         addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
701         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
702
703         LOG.info("testAddServerWithNoConsensusReached ending");
704     }
705
706     @Test
707     public void testAddServerWithExistingServer() {
708         LOG.info("testAddServerWithExistingServer starting");
709
710         RaftActorContext initialActorContext = new MockRaftActorContext();
711
712         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
713                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
714                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
715                 actorFactory.generateActorId(LEADER_ID));
716
717         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
718
719         AddServerReply addServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"), AddServerReply.class);
720         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
721
722         LOG.info("testAddServerWithExistingServer ending");
723     }
724
725     @Test
726     public void testAddServerForwardedToLeader() {
727         LOG.info("testAddServerForwardedToLeader starting");
728
729         setupNewFollower();
730         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
731         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
732
733         ActorRef leaderActor = actorFactory.createActor(
734                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
735
736         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
738                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
739                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
740                 actorFactory.generateActorId(FOLLOWER_ID));
741         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
742
743         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
744                 -1, -1, (short)0), leaderActor);
745
746         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
747                 testKit.getRef());
748         expectFirstMatching(leaderActor, AddServer.class);
749
750         LOG.info("testAddServerForwardedToLeader ending");
751     }
752
753     @Test
754     public void testOnApplyState() {
755         LOG.info("testOnApplyState starting");
756
757         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
762                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763                 actorFactory.generateActorId(LEADER_ID));
764
765         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
766                 noLeaderActor.underlyingActor());
767
768         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
769                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771         assertEquals("Message handled", true, handled);
772
773         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
774                 new MockRaftActorContext.MockPayload("1"));
775         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776         assertEquals("Message handled", false, handled);
777
778         LOG.info("testOnApplyState ending");
779     }
780
781     @Test
782     public void testRemoveServerWithNoLeader() {
783         LOG.info("testRemoveServerWithNoLeader starting");
784
785         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787
788         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
791                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792                 actorFactory.generateActorId(LEADER_ID));
793         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
794
795         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796         RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
797                 RemoveServerReply.class);
798         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
799
800         LOG.info("testRemoveServerWithNoLeader ending");
801     }
802
803     @Test
804     public void testRemoveServerNonExistentServer() {
805         LOG.info("testRemoveServerNonExistentServer starting");
806
807         RaftActorContext initialActorContext = new MockRaftActorContext();
808
809         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812                 actorFactory.generateActorId(LEADER_ID));
813
814         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815         RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
816                 RemoveServerReply.class);
817         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
818
819         LOG.info("testRemoveServerNonExistentServer ending");
820     }
821
822     @Test
823     public void testRemoveServerForwardToLeader() {
824         LOG.info("testRemoveServerForwardToLeader starting");
825
826         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828
829         ActorRef leaderActor = actorFactory.createTestActor(
830                 MessageCollectorActor.props(), actorFactory.generateActorId(LEADER_ID));
831
832         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
833                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
834                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
835                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
836                 actorFactory.generateActorId(FOLLOWER_ID));
837         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
838
839         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
840                 -1, -1, (short)0), leaderActor);
841
842         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
843         expectFirstMatching(leaderActor, RemoveServer.class);
844
845         LOG.info("testRemoveServerForwardToLeader ending");
846     }
847
848     @Test
849     public void testRemoveServer() {
850         LOG.info("testRemoveServer starting");
851
852         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
853         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
855
856         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
857         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
858         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
859         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
860         RaftActorContext initialActorContext = new MockRaftActorContext();
861
862         final String downNodeId = "downNode";
863         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
864                 ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
865                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
866                 actorFactory.generateActorId(LEADER_ID));
867
868         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
869
870         ActorRef follower1Collector = actorFactory.createActor(
871                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
872         final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
873                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
874                         FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
875                         follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
876
877         ActorRef follower2Collector = actorFactory.createActor(
878                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
879         final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
880                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
881                         FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
882                         follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
883
884         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
885         follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
886         follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
887
888         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
889         RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
890                 RemoveServerReply.class);
891         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
892
893         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
894         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
895         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
896                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
897
898         applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
899         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
900         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
901                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
902
903         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
904         assertTrue("Expected Leader", currentBehavior instanceof Leader);
905         assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
906
907         MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
908
909         LOG.info("testRemoveServer ending");
910     }
911
912     @Test
913     public void testRemoveServerLeader() {
914         LOG.info("testRemoveServerLeader starting");
915
916         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
917         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
918         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
919
920         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
921         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
922         RaftActorContext initialActorContext = new MockRaftActorContext();
923
924         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
925                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
926                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
927                 actorFactory.generateActorId(LEADER_ID));
928
929         final ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
930
931         final ActorRef followerCollector =
932                 actorFactory.createActor(MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
933         actorFactory.createTestActor(
934                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
935                         configParams, NO_PERSISTENCE, followerCollector)
936                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
937                 followerActorId);
938
939         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
940         RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
941                 RemoveServerReply.class);
942         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
943
944         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
945         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
946         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
947                 votingServer(FOLLOWER_ID));
948
949         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
950
951         LOG.info("testRemoveServerLeader ending");
952     }
953
954     @Test
955     public void testRemoveServerLeaderWithNoFollowers() {
956         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
957
958         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
959                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
960                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
961                 actorFactory.generateActorId(LEADER_ID));
962
963         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
964         RemoveServerReply removeServerReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
965                 RemoveServerReply.class);
966         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
967
968         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
969     }
970
971     @Test
972     public void testChangeServersVotingStatus() {
973         LOG.info("testChangeServersVotingStatus starting");
974
975         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
976         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
977         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
978
979         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
980         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
981         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
982         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
983
984         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
985                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
986                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
987                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
988         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
989
990         ActorRef follower1Collector = actorFactory.createActor(
991                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
992         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
993                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
994                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
995                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
996
997         ActorRef follower2Collector = actorFactory.createActor(
998                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
999         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1000                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1001                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1002                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1003
1004         // Send first ChangeServersVotingStatus message
1005
1006         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
1007                 testKit.getRef());
1008         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1009         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1010
1011         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1012         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1013         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1014                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1015
1016         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1017         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1018                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1019                 nonVotingServer(FOLLOWER_ID2));
1020
1021         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1022         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1023                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1024                 nonVotingServer(FOLLOWER_ID2));
1025
1026         MessageCollectorActor.clearMessages(leaderCollector);
1027         MessageCollectorActor.clearMessages(follower1Collector);
1028         MessageCollectorActor.clearMessages(follower2Collector);
1029
1030         // Send second ChangeServersVotingStatus message
1031
1032         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1033         reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1034         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1035
1036         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1037         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1038                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1039
1040         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1041         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1042                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1043
1044         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1045         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1046                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1047
1048         LOG.info("testChangeServersVotingStatus ending");
1049     }
1050
1051     @Test
1052     public void testChangeLeaderToNonVoting() {
1053         LOG.info("testChangeLeaderToNonVoting starting");
1054
1055         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1056         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1057
1058         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1059         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1060         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1061         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1062
1063         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1064                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1065                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1066                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1067         ActorRef leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1068
1069         ActorRef follower1Collector = actorFactory.createActor(
1070                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1071         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1072                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1073                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1074                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1075
1076         ActorRef follower2Collector = actorFactory.createActor(
1077                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1078         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1079                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1080                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1081                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1082
1083         // Send ChangeServersVotingStatus message
1084
1085         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1086         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1087         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1088
1089         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1090         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1091                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1092
1093         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1094         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1095                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1096
1097         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1098         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1099                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1100
1101         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1102         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1103
1104         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1105
1106         LOG.info("testChangeLeaderToNonVoting ending");
1107     }
1108
1109     @Test
1110     public void testChangeLeaderToNonVotingInSingleNode() {
1111         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1112
1113         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1114                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1115                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1116
1117         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1118         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1119         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1120
1121         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1122     }
1123
1124     @Test
1125     public void testChangeToVotingWithNoLeader() {
1126         LOG.info("testChangeToVotingWithNoLeader starting");
1127
1128         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1129         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1130         configParams.setElectionTimeoutFactor(5);
1131
1132         final String node1ID = "node1";
1133         final String node2ID = "node2";
1134
1135         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1136         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1137         // actors created).
1138
1139         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1140                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1141                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1142         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1143
1144         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1145         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1146         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1147         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1148         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1149         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1150
1151         ActorRef node1Collector = actorFactory.createActor(
1152                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1153         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1154                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1155                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1156         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1157
1158         ActorRef node2Collector = actorFactory.createActor(
1159                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1160         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1161                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1162                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1163         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1164
1165         node1RaftActor.waitForInitializeBehaviorComplete();
1166         node2RaftActor.waitForInitializeBehaviorComplete();
1167
1168         // Verify the intended server config was loaded and applied.
1169         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1170                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1171                 votingServer("downNode2"));
1172         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1173         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1174         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1175
1176         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1177                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1178                 votingServer("downNode2"));
1179         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1180
1181         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1182         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1183         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1184         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1185
1186         // First send the message such that node1 has no peer address for node2 - should fail.
1187
1188         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1189                 node2ID, true, "downNode1", false, "downNode2", false));
1190         node1RaftActorRef.tell(changeServers, testKit.getRef());
1191         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1192         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1193         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1194
1195         // Send an AppendEntries so node1 has a leaderId
1196
1197         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1198         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1199                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1200
1201         // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
1202         // ChangeServersVotingStatus message, it will try to elect a leader.
1203
1204         AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
1205             rs -> assertEquals("getLeader", null, rs.getLeader()));
1206
1207         // Update node2's peer address and send the message again
1208
1209         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1210
1211         node1RaftActorRef.tell(changeServers, testKit.getRef());
1212         reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1213         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1214
1215         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1216                 ApplyJournalEntries.class);
1217         assertEquals("getToIndex", 1, apply.getToIndex());
1218         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1219                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1220                 nonVotingServer("downNode2"));
1221         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1222         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1223
1224         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1225         assertEquals("getToIndex", 1, apply.getToIndex());
1226         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1227                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1228                 nonVotingServer("downNode2"));
1229         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1230         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1231
1232         LOG.info("testChangeToVotingWithNoLeader ending");
1233     }
1234
1235     @Test
1236     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1237         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1238
1239         final String node1ID = "node1";
1240         final String node2ID = "node2";
1241
1242         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1243                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1244                         ? actorFactory.createTestActorPath(node2ID) : null;
1245
1246         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1247                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1248         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1249
1250         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1251         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1252         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1253         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1254
1255         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1256         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1257         configParams1.setElectionTimeoutFactor(1);
1258         configParams1.setPeerAddressResolver(peerAddressResolver);
1259         ActorRef node1Collector = actorFactory.createActor(
1260                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1261         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1262                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1263                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1264         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1265
1266         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1267         configParams2.setElectionTimeoutFactor(1000000);
1268         configParams2.setPeerAddressResolver(peerAddressResolver);
1269         ActorRef node2Collector = actorFactory.createActor(
1270                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1271         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1272                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1273                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1274         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1275
1276         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1277         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1278         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1279         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1280         // node2 was previously voting.
1281
1282         node2RaftActor.setDropMessageOfType(RequestVote.class);
1283
1284         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1285         node1RaftActorRef.tell(changeServers, testKit.getRef());
1286         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1287         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1288
1289         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1290                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1291         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1292
1293         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1294     }
1295
1296     @Test
1297     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1298         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1299
1300         final String node1ID = "node1";
1301         final String node2ID = "node2";
1302
1303         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1304                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1305                         ? actorFactory.createTestActorPath(node2ID) : null;
1306
1307         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1308         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1309         configParams.setElectionTimeoutFactor(3);
1310         configParams.setPeerAddressResolver(peerAddressResolver);
1311
1312         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1313                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1314         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1315
1316         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1317         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1318         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1319         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1320         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1321                 new MockRaftActorContext.MockPayload("2")));
1322         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1323
1324         ActorRef node1Collector = actorFactory.createActor(
1325                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1326         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1327                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1328                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1329         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1330
1331         ActorRef node2Collector = actorFactory.createActor(
1332                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1333         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1334                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1335                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1336         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1337
1338         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1339         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1340         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1341         // forward the request to node2.
1342
1343         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1344                 ImmutableMap.of(node1ID, true, node2ID, true));
1345         node1RaftActorRef.tell(changeServers, testKit.getRef());
1346         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1347         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1348
1349         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1350         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1351                 votingServer(node1ID), votingServer(node2ID));
1352         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1353
1354         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1355         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1356                 votingServer(node1ID), votingServer(node2ID));
1357         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1358         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1359
1360         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1361     }
1362
1363     @Test
1364     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1365         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1366
1367         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1368         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1369         configParams.setElectionTimeoutFactor(100000);
1370
1371         final String node1ID = "node1";
1372         final String node2ID = "node2";
1373
1374         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1375                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1376                         ? actorFactory.createTestActorPath(node2ID) : null);
1377
1378         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1379                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1380         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1381
1382         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1383         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1384         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1385         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1386
1387         ActorRef node1Collector = actorFactory.createActor(
1388                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1389         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1390                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1391                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1392         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1393
1394         ActorRef node2Collector = actorFactory.createActor(
1395                 MessageCollectorActor.props(), actorFactory.generateActorId("collector"));
1396         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1397                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1398                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1399         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1400
1401         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1402         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1403         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1404         // request to node2 when node2 is elected.
1405
1406         node2RaftActor.setDropMessageOfType(RequestVote.class);
1407
1408         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1409                 node2ID, true));
1410         node1RaftActorRef.tell(changeServers, testKit.getRef());
1411
1412         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1413
1414         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1415
1416         ServerChangeReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ServerChangeReply.class);
1417         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1418
1419         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1420         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1421                 votingServer(node1ID), votingServer(node2ID));
1422         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1423         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1424
1425         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1426         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1427                 votingServer(node1ID), votingServer(node2ID));
1428         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1429
1430         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1431     }
1432
1433     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1434         Stopwatch sw = Stopwatch.createStarted();
1435         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1436             for (RaftActor raftActor : raftActors) {
1437                 if (raftActor.getRaftState() == expState) {
1438                     return;
1439                 }
1440             }
1441         }
1442
1443         fail("None of the RaftActors have state " + expState);
1444     }
1445
1446     private static ServerInfo votingServer(String id) {
1447         return new ServerInfo(id, true);
1448     }
1449
1450     private static ServerInfo nonVotingServer(String id) {
1451         return new ServerInfo(id, false);
1452     }
1453
1454     private ActorRef newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1455         return newCollectorActor(leaderRaftActor, LEADER_ID);
1456     }
1457
1458     private ActorRef newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1459         ActorRef collectorActor = actorFactory.createTestActor(
1460                 MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
1461         raftActor.setCollectorActor(collectorActor);
1462         return collectorActor;
1463     }
1464
1465     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1466         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1467         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1468         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1469         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1470     }
1471
1472     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1473         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1474         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1475         configParams.setElectionTimeoutFactor(100000);
1476         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
1477         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1478         termInfo.update(1, LEADER_ID);
1479         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1480                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1481                 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1482     }
1483
1484     abstract static class AbstractMockRaftActor extends MockRaftActor {
1485         private volatile ActorRef collectorActor;
1486         private volatile Class<?> dropMessageOfType;
1487
1488         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1489                 boolean persistent, ActorRef collectorActor) {
1490             super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1491                     .persistent(Optional.of(persistent)));
1492             this.collectorActor = collectorActor;
1493         }
1494
1495         void setDropMessageOfType(Class<?> dropMessageOfType) {
1496             this.dropMessageOfType = dropMessageOfType;
1497         }
1498
1499         void setCollectorActor(ActorRef collectorActor) {
1500             this.collectorActor = collectorActor;
1501         }
1502
1503         @Override
1504         public void handleCommand(Object message) {
1505             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1506                 super.handleCommand(message);
1507             }
1508
1509             if (collectorActor != null) {
1510                 collectorActor.tell(message, getSender());
1511             }
1512         }
1513     }
1514
1515     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1516
1517         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1518                 boolean persistent, ActorRef collectorActor) {
1519             super(id, peerAddresses, config, persistent, collectorActor);
1520             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1521                 @Override
1522                 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1523                     actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
1524                 }
1525
1526                 @Override
1527                 public void applySnapshot(
1528                         org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
1529                 }
1530
1531                 @Override
1532                 public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
1533                         ByteSource snapshotBytes) {
1534                     throw new UnsupportedOperationException();
1535                 }
1536             };
1537         }
1538
1539         public static Props props(final String id, final Map<String, String> peerAddresses,
1540                 ConfigParams config, boolean persistent, ActorRef collectorActor) {
1541
1542             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1543                     persistent, collectorActor);
1544         }
1545
1546     }
1547
1548     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1549         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1550                 RaftActorContext fromContext) {
1551             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1552             setPersistence(false);
1553
1554             RaftActorContext context = getRaftActorContext();
1555             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1556                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1557                 getState().add(entry.getData());
1558                 context.getReplicatedLog().append(entry);
1559             }
1560
1561             context.setCommitIndex(fromContext.getCommitIndex());
1562             context.setLastApplied(fromContext.getLastApplied());
1563             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1564                     fromContext.getTermInformation().getVotedFor());
1565         }
1566
1567         @Override
1568         protected void initializeBehavior() {
1569             changeCurrentBehavior(new Leader(getRaftActorContext()));
1570             initializeBehaviorComplete.countDown();
1571         }
1572
1573         @Override
1574         @SuppressWarnings("checkstyle:IllegalCatch")
1575         public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
1576             MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
1577             if (installSnapshotStream.isPresent()) {
1578                 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
1579             }
1580
1581             actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
1582         }
1583
1584         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1585             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1586             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1587             configParams.setElectionTimeoutFactor(10);
1588             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1589         }
1590     }
1591
1592     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1593         public MockNewFollowerRaftActor(ConfigParams config, ActorRef collectorActor) {
1594             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1595                     collectorActor);
1596             setPersistence(false);
1597         }
1598
1599         static Props props(ConfigParams config, ActorRef collectorActor) {
1600             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1601         }
1602     }
1603 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.