Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17
18 import akka.actor.ActorRef;
19 import akka.actor.Props;
20 import akka.actor.UntypedActor;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Sets;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
44 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
45 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
46 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
47 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
48 import org.opendaylight.controller.cluster.raft.messages.AddServer;
49 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
51 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
55 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
56 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
58 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
59 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
62 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
63 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
64 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
65 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
66 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
67 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
68 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
69 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72 import scala.concurrent.duration.FiniteDuration;
73
74 /**
75  * Unit tests for RaftActorServerConfigurationSupport.
76  *
77  * @author Thomas Pantelis
78  */
79 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
80     static final String LEADER_ID = "leader";
81     static final String FOLLOWER_ID = "follower";
82     static final String FOLLOWER_ID2 = "follower2";
83     static final String NEW_SERVER_ID = "new-server";
84     static final String NEW_SERVER_ID2 = "new-server2";
85     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
86     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
87     private static final boolean NO_PERSISTENCE = false;
88     private static final boolean PERSISTENT = true;
89
90     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
91
92     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
93             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
94             actorFactory.generateActorId(FOLLOWER_ID));
95
96     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
97     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
98     private RaftActorContext newFollowerActorContext;
99
100     private final JavaTestKit testKit = new JavaTestKit(getSystem());
101
102     @Before
103     public void setup() {
104         InMemoryJournal.clear();
105         InMemorySnapshotStore.clear();
106     }
107
108     @SuppressWarnings("checkstyle:IllegalCatch")
109     private void setupNewFollower() {
110         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
111
112         newFollowerCollectorActor = actorFactory.createTestActor(
113                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
114                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
115         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
116                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
117                 actorFactory.generateActorId(NEW_SERVER_ID));
118
119         try {
120             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
121         } catch (Exception e) {
122             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
123         }
124     }
125
126     private static DefaultConfigParamsImpl newFollowerConfigParams() {
127         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
128         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
129         configParams.setElectionTimeoutFactor(100000);
130         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
131         return configParams;
132     }
133
134     @After
135     public void tearDown() throws Exception {
136         actorFactory.close();
137     }
138
139     @Test
140     public void testAddServerWithExistingFollower() throws Exception {
141         LOG.info("testAddServerWithExistingFollower starting");
142         setupNewFollower();
143         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
144         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
145                 0, 3, 1).build());
146         followerActorContext.setCommitIndex(2);
147         followerActorContext.setLastApplied(2);
148
149         Follower follower = new Follower(followerActorContext);
150         followerActor.underlyingActor().setBehavior(follower);
151         followerActorContext.setCurrentBehavior(follower);
152
153         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
154                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
155                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
156                 actorFactory.generateActorId(LEADER_ID));
157
158         // Expect initial heartbeat from the leader.
159         expectFirstMatching(followerActor, AppendEntries.class);
160         clearMessages(followerActor);
161
162         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
163         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
164
165         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
166
167         // Leader should install snapshot - capture and verify ApplySnapshot contents
168
169         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
170         @SuppressWarnings("unchecked")
171         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
172         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
173
174         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
175         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
176         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
177
178         // Verify ServerConfigurationPayload entry in leader's log
179
180         expectFirstMatching(leaderCollectorActor, ApplyState.class);
181         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
182         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
183         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
184         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
185         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
186                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
187
188         // Verify ServerConfigurationPayload entry in both followers
189
190         expectFirstMatching(followerActor, ApplyState.class);
191         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
192         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
193                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
194
195         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
196         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
197         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
198                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
199
200         // Verify new server config was applied in both followers
201
202         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
203
204         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
205                 newFollowerActorContext.getPeerIds());
206
207         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
208         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
209         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
210         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
211
212         assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
213                 InMemoryJournal.get(LEADER_ID, SimpleReplicatedLogEntry.class).size());
214         assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
215                 InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
216
217         assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
218                 InMemoryJournal.get(NEW_SERVER_ID, SimpleReplicatedLogEntry.class).size());
219         assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
220                 InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
221
222         LOG.info("testAddServerWithExistingFollower ending");
223     }
224
225     @Test
226     public void testAddServerWithNoExistingFollower() throws Exception {
227         LOG.info("testAddServerWithNoExistingFollower starting");
228
229         setupNewFollower();
230         RaftActorContext initialActorContext = new MockRaftActorContext();
231         initialActorContext.setCommitIndex(1);
232         initialActorContext.setLastApplied(1);
233         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
234                 0, 2, 1).build());
235
236         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
237                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
238                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
239                 actorFactory.generateActorId(LEADER_ID));
240
241         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
242         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
243
244         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
245
246         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
247
248         // Leader should install snapshot - capture and verify ApplySnapshot contents
249
250         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
251         @SuppressWarnings("unchecked")
252         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
253         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
254
255         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
256         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
257         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
258
259         // Verify ServerConfigurationPayload entry in leader's log
260
261         expectFirstMatching(leaderCollectorActor, ApplyState.class);
262         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
263         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
264         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
265         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
266                 votingServer(NEW_SERVER_ID));
267
268         // Verify ServerConfigurationPayload entry in the new follower
269
270         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
271         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
272         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
273                 votingServer(NEW_SERVER_ID));
274
275         // Verify new server config was applied in the new follower
276
277         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
278
279         LOG.info("testAddServerWithNoExistingFollower ending");
280     }
281
282     @Test
283     public void testAddServersAsNonVoting() throws Exception {
284         LOG.info("testAddServersAsNonVoting starting");
285
286         setupNewFollower();
287         RaftActorContext initialActorContext = new MockRaftActorContext();
288
289         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
290                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
291                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
292                 actorFactory.generateActorId(LEADER_ID));
293
294         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
295         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
296
297         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
298
299         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
300
301         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
302         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
303         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
304
305         // Verify ServerConfigurationPayload entry in leader's log
306
307         expectFirstMatching(leaderCollectorActor, ApplyState.class);
308
309         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
310         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
311         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
312         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
313                 nonVotingServer(NEW_SERVER_ID));
314
315         // Verify ServerConfigurationPayload entry in the new follower
316
317         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
318         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
319         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
320                 nonVotingServer(NEW_SERVER_ID));
321
322         // Verify new server config was applied in the new follower
323
324         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
325
326         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
327
328         // Add another non-voting server.
329
330         clearMessages(leaderCollectorActor);
331
332         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
333         Follower newFollower2 = new Follower(follower2ActorContext);
334         followerActor.underlyingActor().setBehavior(newFollower2);
335
336         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
337
338         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
339         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
340         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
341
342         expectFirstMatching(leaderCollectorActor, ApplyState.class);
343         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
344         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
345         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
346         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
347                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
348
349         LOG.info("testAddServersAsNonVoting ending");
350     }
351
352     @Test
353     public void testAddServerWithOperationInProgress() throws Exception {
354         LOG.info("testAddServerWithOperationInProgress starting");
355
356         setupNewFollower();
357         RaftActorContext initialActorContext = new MockRaftActorContext();
358
359         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
360                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
361                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
362                 actorFactory.generateActorId(LEADER_ID));
363
364         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
365         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
366
367         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
368
369         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
370         Follower newFollower2 = new Follower(follower2ActorContext);
371         followerActor.underlyingActor().setBehavior(newFollower2);
372
373         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
374         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
375
376         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
377
378         // Wait for leader's install snapshot and capture it
379
380         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
381
382         // Send a second AddServer - should get queued
383         JavaTestKit testKit2 = new JavaTestKit(getSystem());
384         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
385
386         // Continue the first AddServer
387         newFollowerRaftActorInstance.setDropMessageOfType(null);
388         newFollowerRaftActor.tell(installSnapshot, leaderActor);
389
390         // Verify both complete successfully
391         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
392         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
393
394         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
395         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
396
397         // Verify ServerConfigurationPayload entries in leader's log
398
399         expectMatching(leaderCollectorActor, ApplyState.class, 2);
400         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
401         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
402         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
403         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
404                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
405
406         // Verify ServerConfigurationPayload entry in the new follower
407
408         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
409         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
410                newFollowerActorContext.getPeerIds());
411
412         LOG.info("testAddServerWithOperationInProgress ending");
413     }
414
415     @Test
416     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
417         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
418
419         setupNewFollower();
420         RaftActorContext initialActorContext = new MockRaftActorContext();
421
422         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
423                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
424                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
425                 actorFactory.generateActorId(LEADER_ID));
426
427         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
428         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
429
430         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
431
432         // Drop commit message for now to delay snapshot completion
433         leaderRaftActor.setDropMessageOfType(String.class);
434
435         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
436
437         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
438
439         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
440
441         leaderRaftActor.setDropMessageOfType(null);
442         leaderActor.tell(commitMsg, leaderActor);
443
444         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
445         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
446         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
447
448         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
449
450         // Verify ServerConfigurationPayload entry in leader's log
451
452         expectFirstMatching(leaderCollectorActor, ApplyState.class);
453         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
454         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
455         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
456         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
457                 votingServer(NEW_SERVER_ID));
458
459         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
460     }
461
462     @Test
463     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
464         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
465
466         setupNewFollower();
467         RaftActorContext initialActorContext = new MockRaftActorContext();
468
469         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
470                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
471                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
472                 actorFactory.generateActorId(LEADER_ID));
473
474         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
475         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
476
477         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
478
479         // Drop commit message so the snapshot doesn't complete.
480         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
481
482         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
483
484         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
485
486         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
487         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
488
489         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
490
491         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
492     }
493
494     @Test
495     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
496         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
497
498         setupNewFollower();
499         RaftActorContext initialActorContext = new MockRaftActorContext();
500
501         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
502                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
503                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
504                 actorFactory.generateActorId(LEADER_ID));
505
506         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
507         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
508         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
509
510         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
511
512         // Drop the commit message so the snapshot doesn't complete yet.
513         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
514
515         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
516
517         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
518
519         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
520
521         // Change the leader behavior to follower
522         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
523
524         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
525         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
526         // isCapturing assertion below.
527         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
528
529         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
530         leaderActor.tell(commitMsg, leaderActor);
531
532         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
533
534         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
535         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
536
537         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
538         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
539
540         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
541     }
542
543     @Test
544     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
545         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
546
547         setupNewFollower();
548         RaftActorContext initialActorContext = new MockRaftActorContext();
549
550         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
551                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
552                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
553                 actorFactory.generateActorId(LEADER_ID));
554
555         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
556         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
557
558         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
559
560         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
561
562         // Drop the UnInitializedFollowerSnapshotReply to delay it.
563         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
564
565         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
566
567         final UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
568                 UnInitializedFollowerSnapshotReply.class);
569
570         // Prevent election timeout when the leader switches to follower
571         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
572
573         // Change the leader behavior to follower
574         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
575
576         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
577         leaderRaftActor.setDropMessageOfType(null);
578         leaderActor.tell(snapshotReply, leaderActor);
579
580         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
581         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
582
583         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
584
585         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
586     }
587
588     @Test
589     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
590         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
591
592         setupNewFollower();
593         RaftActorContext initialActorContext = new MockRaftActorContext();
594
595         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
596                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
597                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
598                 actorFactory.generateActorId(LEADER_ID));
599
600         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
601         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
602         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
603
604         // Drop the InstallSnapshot message so it times out
605         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
606
607         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
608
609         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
610
611         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
612         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
613
614         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
615         assertEquals("Leader followers size", 0,
616                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
617
618         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
619     }
620
621     @Test
622     public void testAddServerWithNoLeader() {
623         LOG.info("testAddServerWithNoLeader starting");
624
625         setupNewFollower();
626         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
627         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
628
629         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
630                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
631                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
632                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
633                 actorFactory.generateActorId(LEADER_ID));
634         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
635
636         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
637                 testKit.getRef());
638         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
639         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
640
641         LOG.info("testAddServerWithNoLeader ending");
642     }
643
644     @Test
645     public void testAddServerWithNoConsensusReached() {
646         LOG.info("testAddServerWithNoConsensusReached starting");
647
648         setupNewFollower();
649         RaftActorContext initialActorContext = new MockRaftActorContext();
650
651         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
652                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
653                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
654                 actorFactory.generateActorId(LEADER_ID));
655
656         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
657         final RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
658
659         final TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
660
661         // Drop UnInitializedFollowerSnapshotReply initially
662         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
663
664         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
665         newFollowerCollectorActor = newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
666
667         // Drop AppendEntries to the new follower so consensus isn't reached
668         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
669
670         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
671
672         // Capture the UnInitializedFollowerSnapshotReply
673         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
674
675         // Send the UnInitializedFollowerSnapshotReply to resume the first request
676         leaderRaftActor.setDropMessageOfType(null);
677         leaderActor.tell(snapshotReply, leaderActor);
678
679         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
680
681         // Send a second AddServer
682         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
683
684         // The first AddServer should succeed with OK even though consensus wasn't reached
685         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
686         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
687         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
688
689         // Verify ServerConfigurationPayload entry in leader's log
690         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
691                 votingServer(NEW_SERVER_ID));
692
693         // The second AddServer should fail since consensus wasn't reached for the first
694         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
695         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
696
697         // Re-send the second AddServer - should also fail
698         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
699         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
700         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
701
702         LOG.info("testAddServerWithNoConsensusReached ending");
703     }
704
705     @Test
706     public void testAddServerWithExistingServer() {
707         LOG.info("testAddServerWithExistingServer starting");
708
709         RaftActorContext initialActorContext = new MockRaftActorContext();
710
711         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
712                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
713                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
714                 actorFactory.generateActorId(LEADER_ID));
715
716         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
717
718         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
719         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
720
721         LOG.info("testAddServerWithExistingServer ending");
722     }
723
724     @Test
725     public void testAddServerForwardedToLeader() {
726         LOG.info("testAddServerForwardedToLeader starting");
727
728         setupNewFollower();
729         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
730         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
731
732         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
733                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
734                 actorFactory.generateActorId(LEADER_ID));
735
736         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
737                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
738                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
739                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
740                 actorFactory.generateActorId(FOLLOWER_ID));
741         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
742
743         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
744                 -1, -1, (short)0), leaderActor);
745
746         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true),
747                 testKit.getRef());
748         expectFirstMatching(leaderActor, AddServer.class);
749
750         LOG.info("testAddServerForwardedToLeader ending");
751     }
752
753     @Test
754     public void testOnApplyState() {
755         LOG.info("testOnApplyState starting");
756
757         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
758         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
759         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
760                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
761                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
762                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
763                 actorFactory.generateActorId(LEADER_ID));
764
765         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
766                 noLeaderActor.underlyingActor());
767
768         ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
769                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771         assertEquals("Message handled", true, handled);
772
773         ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
774                 new MockRaftActorContext.MockPayload("1"));
775         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776         assertEquals("Message handled", false, handled);
777
778         LOG.info("testOnApplyState ending");
779     }
780
781     @Test
782     public void testRemoveServerWithNoLeader() {
783         LOG.info("testRemoveServerWithNoLeader starting");
784
785         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787
788         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false))
791                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792                 actorFactory.generateActorId(LEADER_ID));
793         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
794
795         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
797                 RemoveServerReply.class);
798         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
799
800         LOG.info("testRemoveServerWithNoLeader ending");
801     }
802
803     @Test
804     public void testRemoveServerNonExistentServer() {
805         LOG.info("testRemoveServerNonExistentServer starting");
806
807         RaftActorContext initialActorContext = new MockRaftActorContext();
808
809         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
810                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
811                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
812                 actorFactory.generateActorId(LEADER_ID));
813
814         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
815         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
816                 RemoveServerReply.class);
817         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
818
819         LOG.info("testRemoveServerNonExistentServer ending");
820     }
821
822     @Test
823     public void testRemoveServerForwardToLeader() {
824         LOG.info("testRemoveServerForwardToLeader starting");
825
826         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
827         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828
829         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
830                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
831                 actorFactory.generateActorId(LEADER_ID));
832
833         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
834                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
835                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false))
836                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
837                 actorFactory.generateActorId(FOLLOWER_ID));
838         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
839
840         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
841                 -1, -1, (short)0), leaderActor);
842
843         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
844         expectFirstMatching(leaderActor, RemoveServer.class);
845
846         LOG.info("testRemoveServerForwardToLeader ending");
847     }
848
849     @Test
850     public void testRemoveServer() {
851         LOG.info("testRemoveServer starting");
852
853         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
854         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
856
857         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
858         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
859         RaftActorContext initialActorContext = new MockRaftActorContext();
860
861         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
862                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
863                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
864                 actorFactory.generateActorId(LEADER_ID));
865
866         final TestActorRef<MessageCollectorActor> leaderCollector =
867                 newLeaderCollectorActor(leaderActor.underlyingActor());
868
869         TestActorRef<MessageCollectorActor> collector = actorFactory.createTestActor(MessageCollectorActor.props()
870                 .withDispatcher(Dispatchers.DefaultDispatcherId()),
871                         actorFactory.generateActorId("collector"));
872         actorFactory.createTestActor(
873                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
874                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
875                 followerActorId);
876
877         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
878         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
879                 RemoveServerReply.class);
880         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
881
882         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
883         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
884         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
885                 votingServer(LEADER_ID));
886
887         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
888         assertTrue("Expected Leader", currentBehavior instanceof Leader);
889         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
890
891         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
892
893         LOG.info("testRemoveServer ending");
894     }
895
896     @Test
897     public void testRemoveServerLeader() {
898         LOG.info("testRemoveServerLeader starting");
899
900         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
901         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
902         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
903
904         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
905         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
906         RaftActorContext initialActorContext = new MockRaftActorContext();
907
908         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
909                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
910                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
911                 actorFactory.generateActorId(LEADER_ID));
912
913         final TestActorRef<MessageCollectorActor> leaderCollector =
914                 newLeaderCollectorActor(leaderActor.underlyingActor());
915
916         final TestActorRef<MessageCollectorActor> followerCollector =
917                 actorFactory.createTestActor(MessageCollectorActor.props()
918                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
919         actorFactory.createTestActor(
920                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
921                         configParams, NO_PERSISTENCE, followerCollector)
922                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
923                 followerActorId);
924
925         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
926         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
927                 RemoveServerReply.class);
928         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
929
930         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
931         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
932         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
933                 votingServer(FOLLOWER_ID));
934
935         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
936
937         LOG.info("testRemoveServerLeader ending");
938     }
939
940     @Test
941     public void testRemoveServerLeaderWithNoFollowers() {
942         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
943
944         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
945                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
946                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
947                 actorFactory.generateActorId(LEADER_ID));
948
949         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
950         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
951                 RemoveServerReply.class);
952         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
953
954         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
955     }
956
957     @Test
958     public void testChangeServersVotingStatus() {
959         LOG.info("testChangeServersVotingStatus starting");
960
961         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
962         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
963         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
964
965         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
966         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
967         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
968         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
969
970         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
971                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
972                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
973                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
974         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
975
976         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
977                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
978                 actorFactory.generateActorId("collector"));
979         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
980                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
981                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
982                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
983
984         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
985                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
986                 actorFactory.generateActorId("collector"));
987         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
988                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
989                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
990                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
991
992         // Send first ChangeServersVotingStatus message
993
994         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
995                 testKit.getRef());
996         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
997         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
998
999         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1000         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
1001         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1002                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1003
1004         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1005         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1006                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1007                 nonVotingServer(FOLLOWER_ID2));
1008
1009         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1010         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1011                 .getReplicatedLog(), votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID),
1012                 nonVotingServer(FOLLOWER_ID2));
1013
1014         MessageCollectorActor.clearMessages(leaderCollector);
1015         MessageCollectorActor.clearMessages(follower1Collector);
1016         MessageCollectorActor.clearMessages(follower2Collector);
1017
1018         // Send second ChangeServersVotingStatus message
1019
1020         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1021         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1022         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1023
1024         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1025         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1026                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1027
1028         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1029         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1030                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1031
1032         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1033         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1034                 .getReplicatedLog(), votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1035
1036         LOG.info("testChangeServersVotingStatus ending");
1037     }
1038
1039     @Test
1040     public void testChangeLeaderToNonVoting() {
1041         LOG.info("testChangeLeaderToNonVoting starting");
1042
1043         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1044         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1045
1046         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1047         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1048         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1049         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1050
1051         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1052                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1053                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext())
1054                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1055         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1056
1057         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1058                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1059                 actorFactory.generateActorId("collector"));
1060         final TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1061                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1062                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector)
1063                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1064
1065         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1066                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1067                 actorFactory.generateActorId("collector"));
1068         final TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1069                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1070                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector)
1071                         .withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1072
1073         // Send ChangeServersVotingStatus message
1074
1075         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1076         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1077         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1078
1079         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1080         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1081                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1082
1083         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1084         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext()
1085                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1086
1087         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1088         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext()
1089                 .getReplicatedLog(), nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1090
1091         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1092         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1093
1094         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1095
1096         LOG.info("testChangeLeaderToNonVoting ending");
1097     }
1098
1099     @Test
1100     public void testChangeLeaderToNonVotingInSingleNode() {
1101         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1102
1103         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1104                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext())
1105                         .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1106
1107         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1108         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1109         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1110
1111         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1112     }
1113
1114     @Test
1115     public void testChangeToVotingWithNoLeader() {
1116         LOG.info("testChangeToVotingWithNoLeader starting");
1117
1118         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1119         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1120         configParams.setElectionTimeoutFactor(5);
1121
1122         final String node1ID = "node1";
1123         final String node2ID = "node2";
1124
1125         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1126         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1127         // actors created).
1128
1129         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1130                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1131                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1132         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1133
1134         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1135         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1136         InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
1137         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1138         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1139         InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
1140
1141         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1142                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1143                 actorFactory.generateActorId("collector"));
1144         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1145                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1146                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1147         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1148
1149         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1150                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1151                 actorFactory.generateActorId("collector"));
1152         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1153                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1154                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1155         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1156
1157         node1RaftActor.waitForInitializeBehaviorComplete();
1158         node2RaftActor.waitForInitializeBehaviorComplete();
1159
1160         // Verify the intended server config was loaded and applied.
1161         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1162                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1163                 votingServer("downNode2"));
1164         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1165         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1166         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1167
1168         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1169                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1170                 votingServer("downNode2"));
1171         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1172
1173         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1174         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1175         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1176         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1177
1178         // First send the message such that node1 has no peer address for node2 - should fail.
1179
1180         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1181                 node2ID, true, "downNode1", false, "downNode2", false));
1182         node1RaftActorRef.tell(changeServers, testKit.getRef());
1183         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1184         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1185
1186         // Send an AppendEntries so node1 has a leaderId
1187
1188         MessageCollectorActor.clearMessages(node1Collector);
1189
1190         long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
1191         node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
1192                 Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
1193
1194         // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
1195         // ChangeServersVotingStatus message, it will try to elect a leader.
1196
1197         MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
1198
1199         // Update node2's peer address and send the message again
1200
1201         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1202
1203         node1RaftActorRef.tell(changeServers, testKit.getRef());
1204         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1205         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1206
1207         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
1208                 ApplyJournalEntries.class);
1209         assertEquals("getToIndex", 1, apply.getToIndex());
1210         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1211                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1212                 nonVotingServer("downNode2"));
1213         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1214         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1215
1216         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1217         assertEquals("getToIndex", 1, apply.getToIndex());
1218         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1219                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1220                 nonVotingServer("downNode2"));
1221         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1222         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1223
1224         LOG.info("testChangeToVotingWithNoLeader ending");
1225     }
1226
1227     @Test
1228     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1229         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1230
1231         final String node1ID = "node1";
1232         final String node2ID = "node2";
1233
1234         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1235                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1236                         ? actorFactory.createTestActorPath(node2ID) : null;
1237
1238         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1239                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1240         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1241
1242         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1243         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1244         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1245         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1246
1247         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1248         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1249         configParams1.setElectionTimeoutFactor(1);
1250         configParams1.setPeerAddressResolver(peerAddressResolver);
1251         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1252                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1253                 actorFactory.generateActorId("collector"));
1254         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1255                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1256                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1257         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1258
1259         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1260         configParams2.setElectionTimeoutFactor(1000000);
1261         configParams2.setPeerAddressResolver(peerAddressResolver);
1262         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1263                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1264                 actorFactory.generateActorId("collector"));
1265         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1266                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1267                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1268         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1269
1270         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1271         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1272         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1273         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1274         // node2 was previously voting.
1275
1276         node2RaftActor.setDropMessageOfType(RequestVote.class);
1277
1278         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1279         node1RaftActorRef.tell(changeServers, testKit.getRef());
1280         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1281         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1282
1283         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1284                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1285         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1286
1287         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1288     }
1289
1290     @Test
1291     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1292         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1293
1294         final String node1ID = "node1";
1295         final String node2ID = "node2";
1296
1297         final PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID)
1298                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1299                         ? actorFactory.createTestActorPath(node2ID) : null;
1300
1301         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1302         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1303         configParams.setElectionTimeoutFactor(3);
1304         configParams.setPeerAddressResolver(peerAddressResolver);
1305
1306         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1307                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1308         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1309
1310         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1311         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1312         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1313         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1314         InMemoryJournal.addEntry(node2ID, 3, new SimpleReplicatedLogEntry(1, 1,
1315                 new MockRaftActorContext.MockPayload("2")));
1316         InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
1317
1318         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1319                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1320                 actorFactory.generateActorId("collector"));
1321         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1322                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1323                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1324         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1325
1326         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1327                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1328                 actorFactory.generateActorId("collector"));
1329         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1330                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1331                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1332         final CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1333
1334         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1335         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1336         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1337         // forward the request to node2.
1338
1339         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1340                 ImmutableMap.of(node1ID, true, node2ID, true));
1341         node1RaftActorRef.tell(changeServers, testKit.getRef());
1342         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1343         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1344
1345         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1346         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1347                 votingServer(node1ID), votingServer(node2ID));
1348         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1349
1350         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1351         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1352                 votingServer(node1ID), votingServer(node2ID));
1353         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1354         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1355
1356         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1357     }
1358
1359     @Test
1360     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1361         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1362
1363         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1364         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1365         configParams.setElectionTimeoutFactor(100000);
1366
1367         final String node1ID = "node1";
1368         final String node2ID = "node2";
1369
1370         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID)
1371                 ? actorFactory.createTestActorPath(node1ID) : peerId.equals(node2ID)
1372                         ? actorFactory.createTestActorPath(node2ID) : null);
1373
1374         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1375                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1376         SimpleReplicatedLogEntry persistedServerConfigEntry = new SimpleReplicatedLogEntry(0, 1, persistedServerConfig);
1377
1378         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1379         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1380         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1381         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1382
1383         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1384                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1385                 actorFactory.generateActorId("collector"));
1386         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1387                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1388                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1389         final CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1390
1391         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1392                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1393                 actorFactory.generateActorId("collector"));
1394         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1395                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1396                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1397         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1398
1399         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1400         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1401         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1402         // request to node2 when node2 is elected.
1403
1404         node2RaftActor.setDropMessageOfType(RequestVote.class);
1405
1406         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1407                 node2ID, true));
1408         node1RaftActorRef.tell(changeServers, testKit.getRef());
1409
1410         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1411
1412         node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1413
1414         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1415         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1416
1417         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1418         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1419                 votingServer(node1ID), votingServer(node2ID));
1420         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1421         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1422
1423         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1424         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1425                 votingServer(node1ID), votingServer(node2ID));
1426         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1427
1428         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1429     }
1430
1431     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1432         Stopwatch sw = Stopwatch.createStarted();
1433         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
1434             for (RaftActor raftActor : raftActors) {
1435                 if (raftActor.getRaftState() == expState) {
1436                     return;
1437                 }
1438             }
1439         }
1440
1441         fail("None of the RaftActors have state " + expState);
1442     }
1443
1444     private static ServerInfo votingServer(String id) {
1445         return new ServerInfo(id, true);
1446     }
1447
1448     private static ServerInfo nonVotingServer(String id) {
1449         return new ServerInfo(id, false);
1450     }
1451
1452     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1453         return newCollectorActor(leaderRaftActor, LEADER_ID);
1454     }
1455
1456     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1457         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1458                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1459                 actorFactory.generateActorId(id + "Collector"));
1460         raftActor.setCollectorActor(collectorActor);
1461         return collectorActor;
1462     }
1463
1464     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1465         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1466         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1467         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1468         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1469     }
1470
1471     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1472         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1473         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1474         configParams.setElectionTimeoutFactor(100000);
1475         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1476         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1477         termInfo.update(1, LEADER_ID);
1478         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1479                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
1480                 noPersistence, applyState -> actor.tell(applyState, actor), LOG);
1481     }
1482
1483     abstract static class AbstractMockRaftActor extends MockRaftActor {
1484         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1485         private volatile Class<?> dropMessageOfType;
1486
1487         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1488                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1489             super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
1490                     .persistent(Optional.of(persistent)));
1491             this.collectorActor = collectorActor;
1492         }
1493
1494         void setDropMessageOfType(Class<?> dropMessageOfType) {
1495             this.dropMessageOfType = dropMessageOfType;
1496         }
1497
1498         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1499             this.collectorActor = collectorActor;
1500         }
1501
1502         @Override
1503         public void handleCommand(Object message) {
1504             if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1505                 super.handleCommand(message);
1506             }
1507
1508             if (collectorActor != null) {
1509                 collectorActor.tell(message, getSender());
1510             }
1511         }
1512     }
1513
1514     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1515
1516         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1517                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1518             super(id, peerAddresses, config, persistent, collectorActor);
1519             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1520                 @Override
1521                 public void createSnapshot(ActorRef actorRef) {
1522                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1523                 }
1524
1525                 @Override
1526                 public void applySnapshot(byte[] snapshotBytes) {
1527                 }
1528             };
1529         }
1530
1531         public static Props props(final String id, final Map<String, String> peerAddresses,
1532                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1533
1534             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1535                     persistent, collectorActor);
1536         }
1537
1538     }
1539
1540     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1541         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1542                 RaftActorContext fromContext) {
1543             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1544             setPersistence(false);
1545
1546             RaftActorContext context = getRaftActorContext();
1547             for (int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1548                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1549                 getState().add(entry.getData());
1550                 context.getReplicatedLog().append(entry);
1551             }
1552
1553             context.setCommitIndex(fromContext.getCommitIndex());
1554             context.setLastApplied(fromContext.getLastApplied());
1555             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1556                     fromContext.getTermInformation().getVotedFor());
1557         }
1558
1559         @Override
1560         protected void initializeBehavior() {
1561             changeCurrentBehavior(new Leader(getRaftActorContext()));
1562             initializeBehaviorComplete.countDown();
1563         }
1564
1565         @Override
1566         @SuppressWarnings("checkstyle:IllegalCatch")
1567         public void createSnapshot(ActorRef actorRef) {
1568             try {
1569                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1570             } catch (Exception e) {
1571                 LOG.error("createSnapshot failed", e);
1572             }
1573         }
1574
1575         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1576             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1577             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1578             configParams.setElectionTimeoutFactor(10);
1579             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1580         }
1581     }
1582
1583     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1584         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1585             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
1586                     collectorActor);
1587             setPersistence(false);
1588         }
1589
1590         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1591             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1592         }
1593     }
1594 }