Remove snapshot after startup and fix related bug
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
15 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
16 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Optional;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Maps;
27 import com.google.common.collect.Sets;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
45 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
46 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
47 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
48 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
49 import org.opendaylight.controller.cluster.raft.messages.AddServer;
50 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
51 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
52 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
53 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
55 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
56 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
57 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
58 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
59 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
60 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import scala.concurrent.duration.FiniteDuration;
69
70 /**
71  * Unit tests for RaftActorServerConfigurationSupport.
72  *
73  * @author Thomas Pantelis
74  */
75 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
76     static final String LEADER_ID = "leader";
77     static final String FOLLOWER_ID = "follower";
78     static final String FOLLOWER_ID2 = "follower2";
79     static final String NEW_SERVER_ID = "new-server";
80     static final String NEW_SERVER_ID2 = "new-server2";
81     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
82     private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
83     private static final boolean NO_PERSISTENCE = false;
84     private static final boolean PERSISTENT = true;
85
86     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
87
88     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
90             actorFactory.generateActorId(FOLLOWER_ID));
91
92     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
93     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
94     private RaftActorContext newFollowerActorContext;
95
96     private final JavaTestKit testKit = new JavaTestKit(getSystem());
97
98     @Before
99     public void setup() {
100         InMemoryJournal.clear();
101         InMemorySnapshotStore.clear();
102     }
103
104     private void setupNewFollower() {
105         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
106
107         newFollowerCollectorActor = actorFactory.createTestActor(
108                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
109                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
110         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
111                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
112                 actorFactory.generateActorId(NEW_SERVER_ID));
113
114         try {
115             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
116         } catch (Exception e) {
117             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
118         }
119     }
120
121     private static DefaultConfigParamsImpl newFollowerConfigParams() {
122         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
123         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
124         configParams.setElectionTimeoutFactor(100000);
125         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
126         return configParams;
127     }
128
129     @After
130     public void tearDown() throws Exception {
131         actorFactory.close();
132     }
133
134     @Test
135     public void testAddServerWithExistingFollower() throws Exception {
136         LOG.info("testAddServerWithExistingFollower starting");
137         setupNewFollower();
138         RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
139         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
140                 0, 3, 1).build());
141         followerActorContext.setCommitIndex(2);
142         followerActorContext.setLastApplied(2);
143
144         Follower follower = new Follower(followerActorContext);
145         followerActor.underlyingActor().setBehavior(follower);
146         followerActorContext.setCurrentBehavior(follower);
147
148         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
149                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
150                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
151                 actorFactory.generateActorId(LEADER_ID));
152
153         // Expect initial heartbeat from the leader.
154         expectFirstMatching(followerActor, AppendEntries.class);
155         clearMessages(followerActor);
156
157         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
158         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
159
160         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
161
162         // Leader should install snapshot - capture and verify ApplySnapshot contents
163
164         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
165         @SuppressWarnings("unchecked")
166         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
167         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
168
169         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
170         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
171         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
172
173         // Verify ServerConfigurationPayload entry in leader's log
174
175         expectFirstMatching(leaderCollectorActor, ApplyState.class);
176         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
177         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
178         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
179         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
180         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
181                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
182
183         // Verify ServerConfigurationPayload entry in both followers
184
185         expectFirstMatching(followerActor, ApplyState.class);
186         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
187         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
188                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
189
190         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
191         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
192         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
193                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
194
195         // Verify new server config was applied in both followers
196
197         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
198
199         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
200
201         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
202         expectFirstMatching(followerActor, ApplyState.class);
203
204         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
205         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
206         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
207         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
208
209         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
210         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
211         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
212         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
213         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
214         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
215
216         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
217         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
218         logEntry = persistedLogEntries.get(0);
219         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
220         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
221         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
222                 logEntry.getData().getClass());
223
224         LOG.info("testAddServerWithExistingFollower ending");
225     }
226
227     @Test
228     public void testAddServerWithNoExistingFollower() throws Exception {
229         LOG.info("testAddServerWithNoExistingFollower starting");
230
231         setupNewFollower();
232         RaftActorContext initialActorContext = new MockRaftActorContext();
233         initialActorContext.setCommitIndex(1);
234         initialActorContext.setLastApplied(1);
235         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
236                 0, 2, 1).build());
237
238         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
239                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
240                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
241                 actorFactory.generateActorId(LEADER_ID));
242
243         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
244         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
245
246         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
247
248         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
249
250         // Leader should install snapshot - capture and verify ApplySnapshot contents
251
252         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
253         @SuppressWarnings("unchecked")
254         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
255         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
256
257         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
258         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
259         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
260
261         // Verify ServerConfigurationPayload entry in leader's log
262
263         expectFirstMatching(leaderCollectorActor, ApplyState.class);
264         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
265         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
266         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
267         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
268                 votingServer(NEW_SERVER_ID));
269
270         // Verify ServerConfigurationPayload entry in the new follower
271
272         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
273         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
274         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
275                 votingServer(NEW_SERVER_ID));
276
277         // Verify new server config was applied in the new follower
278
279         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
280
281         LOG.info("testAddServerWithNoExistingFollower ending");
282     }
283
284     @Test
285     public void testAddServersAsNonVoting() throws Exception {
286         LOG.info("testAddServersAsNonVoting starting");
287
288         setupNewFollower();
289         RaftActorContext initialActorContext = new MockRaftActorContext();
290
291         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
292                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
293                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
294                 actorFactory.generateActorId(LEADER_ID));
295
296         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
297         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
298
299         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
300
301         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
302
303         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
304         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
305         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
306
307         // Verify ServerConfigurationPayload entry in leader's log
308
309         expectFirstMatching(leaderCollectorActor, ApplyState.class);
310
311         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
312         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
313         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
314         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
315                 nonVotingServer(NEW_SERVER_ID));
316
317         // Verify ServerConfigurationPayload entry in the new follower
318
319         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
320         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
321         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
322                 nonVotingServer(NEW_SERVER_ID));
323
324         // Verify new server config was applied in the new follower
325
326         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
327
328         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
329
330         // Add another non-voting server.
331
332         clearMessages(leaderCollectorActor);
333
334         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
335         Follower newFollower2 = new Follower(follower2ActorContext);
336         followerActor.underlyingActor().setBehavior(newFollower2);
337
338         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
339
340         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
341         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
342         assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
343
344         expectFirstMatching(leaderCollectorActor, ApplyState.class);
345         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
346         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
347         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
348         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
349                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
350
351         LOG.info("testAddServersAsNonVoting ending");
352     }
353
354     @Test
355     public void testAddServerWithOperationInProgress() throws Exception {
356         LOG.info("testAddServerWithOperationInProgress starting");
357
358         setupNewFollower();
359         RaftActorContext initialActorContext = new MockRaftActorContext();
360
361         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
362                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
363                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
364                 actorFactory.generateActorId(LEADER_ID));
365
366         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
367         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
368
369         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
370
371         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
372         Follower newFollower2 = new Follower(follower2ActorContext);
373         followerActor.underlyingActor().setBehavior(newFollower2);
374
375         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
376         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
377
378         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
379
380         // Wait for leader's install snapshot and capture it
381
382         InstallSnapshot installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
383
384         // Send a second AddServer - should get queued
385         JavaTestKit testKit2 = new JavaTestKit(getSystem());
386         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
387
388         // Continue the first AddServer
389         newFollowerRaftActorInstance.setDropMessageOfType(null);
390         newFollowerRaftActor.tell(installSnapshot, leaderActor);
391
392         // Verify both complete successfully
393         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
394         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
395
396         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
397         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
398
399         // Verify ServerConfigurationPayload entries in leader's log
400
401         expectMatching(leaderCollectorActor, ApplyState.class, 2);
402         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
403         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
404         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
405         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
406                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
407
408         // Verify ServerConfigurationPayload entry in the new follower
409
410         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
411         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
412                newFollowerActorContext.getPeerIds());
413
414         LOG.info("testAddServerWithOperationInProgress ending");
415     }
416
417     @Test
418     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
419         LOG.info("testAddServerWithPriorSnapshotInProgress starting");
420
421         setupNewFollower();
422         RaftActorContext initialActorContext = new MockRaftActorContext();
423
424         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
425                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
426                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
427                 actorFactory.generateActorId(LEADER_ID));
428
429         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
430         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
431
432         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
433
434         // Drop commit message for now to delay snapshot completion
435         leaderRaftActor.setDropMessageOfType(String.class);
436
437         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
438
439         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
440
441         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442
443         leaderRaftActor.setDropMessageOfType(null);
444         leaderActor.tell(commitMsg, leaderActor);
445
446         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
447         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
448         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
449
450         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
451
452         // Verify ServerConfigurationPayload entry in leader's log
453
454         expectFirstMatching(leaderCollectorActor, ApplyState.class);
455         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
456         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
457         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
458         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
459                 votingServer(NEW_SERVER_ID));
460
461         LOG.info("testAddServerWithPriorSnapshotInProgress ending");
462     }
463
464     @Test
465     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
466         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
467
468         setupNewFollower();
469         RaftActorContext initialActorContext = new MockRaftActorContext();
470
471         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
472                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
473                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
474                 actorFactory.generateActorId(LEADER_ID));
475
476         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
477         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
478
479         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
480
481         // Drop commit message so the snapshot doesn't complete.
482         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
483
484         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
485
486         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
487
488         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
489         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
490
491         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
492
493         LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
494     }
495
496     @Test
497     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
498         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
499
500         setupNewFollower();
501         RaftActorContext initialActorContext = new MockRaftActorContext();
502
503         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
504                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
505                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
506                 actorFactory.generateActorId(LEADER_ID));
507
508         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
509         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
510         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
511
512         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
513
514         // Drop the commit message so the snapshot doesn't complete yet.
515         leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
516
517         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
518
519         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
520
521         Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
522
523         // Change the leader behavior to follower
524         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
525
526         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
527         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
528         // isCapturing assertion below.
529         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
530
531         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
532         leaderActor.tell(commitMsg, leaderActor);
533
534         leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
535
536         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
537         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
538
539         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
540         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
541
542         LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
543     }
544
545     @Test
546     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
547         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
548
549         setupNewFollower();
550         RaftActorContext initialActorContext = new MockRaftActorContext();
551
552         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
553                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
554                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
555                 actorFactory.generateActorId(LEADER_ID));
556
557         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
558         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
559
560         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
561
562         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
563
564         // Drop the UnInitializedFollowerSnapshotReply to delay it.
565         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
566
567         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
568
569         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
570                 UnInitializedFollowerSnapshotReply.class);
571
572         // Prevent election timeout when the leader switches to follower
573         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
574
575         // Change the leader behavior to follower
576         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
577
578         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
579         leaderRaftActor.setDropMessageOfType(null);
580         leaderActor.tell(snapshotReply, leaderActor);
581
582         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
583         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
584
585         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
586
587         LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
588     }
589
590     @Test
591     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
592         LOG.info("testAddServerWithInstallSnapshotTimeout starting");
593
594         setupNewFollower();
595         RaftActorContext initialActorContext = new MockRaftActorContext();
596
597         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
598                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
599                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
600                 actorFactory.generateActorId(LEADER_ID));
601
602         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
603         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
604         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
605
606         // Drop the InstallSnapshot message so it times out
607         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
608
609         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610
611         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
612
613         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
614         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
615
616         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
617         assertEquals("Leader followers size", 0,
618                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
619
620         LOG.info("testAddServerWithInstallSnapshotTimeout ending");
621     }
622
623     @Test
624     public void testAddServerWithNoLeader() {
625         LOG.info("testAddServerWithNoLeader starting");
626
627         setupNewFollower();
628         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
629         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630
631         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
632                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
633                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
634                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
635                 actorFactory.generateActorId(LEADER_ID));
636         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
637
638         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
639         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
640         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
641
642         LOG.info("testAddServerWithNoLeader ending");
643     }
644
645     @Test
646     public void testAddServerWithNoConsensusReached() {
647         LOG.info("testAddServerWithNoConsensusReached starting");
648
649         setupNewFollower();
650         RaftActorContext initialActorContext = new MockRaftActorContext();
651
652         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
653                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
654                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
655                 actorFactory.generateActorId(LEADER_ID));
656
657         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
658         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
659
660         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
661
662         // Drop UnInitializedFollowerSnapshotReply initially
663         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
664
665         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
666         TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
667                 newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
668
669         // Drop AppendEntries to the new follower so consensus isn't reached
670         newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
671
672         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
673
674         // Capture the UnInitializedFollowerSnapshotReply
675         Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
676
677         // Send the UnInitializedFollowerSnapshotReply to resume the first request
678         leaderRaftActor.setDropMessageOfType(null);
679         leaderActor.tell(snapshotReply, leaderActor);
680
681         expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
682
683         // Send a second AddServer
684         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
685
686         // The first AddServer should succeed with OK even though consensus wasn't reached
687         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
688         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
689         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint().get());
690
691         // Verify ServerConfigurationPayload entry in leader's log
692         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
693                 votingServer(NEW_SERVER_ID));
694
695         // The second AddServer should fail since consensus wasn't reached for the first
696         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
697         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
698
699         // Re-send the second AddServer - should also fail
700         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
701         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
702         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
703
704         LOG.info("testAddServerWithNoConsensusReached ending");
705     }
706
707     @Test
708     public void testAddServerWithExistingServer() {
709         LOG.info("testAddServerWithExistingServer starting");
710
711         RaftActorContext initialActorContext = new MockRaftActorContext();
712
713         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
714                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
715                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
716                 actorFactory.generateActorId(LEADER_ID));
717
718         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
719
720         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
721         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
722
723         LOG.info("testAddServerWithExistingServer ending");
724     }
725
726     @Test
727     public void testAddServerForwardedToLeader() {
728         LOG.info("testAddServerForwardedToLeader starting");
729
730         setupNewFollower();
731         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
732         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
733
734         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
735                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
736                 actorFactory.generateActorId(LEADER_ID));
737
738         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
739                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
740                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
741                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
742                 actorFactory.generateActorId(FOLLOWER_ID));
743         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
744
745         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
746                 -1, -1, (short)0), leaderActor);
747
748         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
749         expectFirstMatching(leaderActor, AddServer.class);
750
751         LOG.info("testAddServerForwardedToLeader ending");
752     }
753
754     @Test
755     public void testOnApplyState() {
756         LOG.info("testOnApplyState starting");
757
758         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
759         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
761                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
762                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
763                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
764                 actorFactory.generateActorId(LEADER_ID));
765
766         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
767
768         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
769                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
770         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
771         assertEquals("Message handled", true, handled);
772
773         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
774                 new MockRaftActorContext.MockPayload("1"));
775         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
776         assertEquals("Message handled", false, handled);
777
778         LOG.info("testOnApplyState ending");
779     }
780
781     @Test
782     public void testRemoveServerWithNoLeader() {
783         LOG.info("testRemoveServerWithNoLeader starting");
784
785         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
786         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787
788         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
789                 MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
790                         followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
791                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
792                 actorFactory.generateActorId(LEADER_ID));
793         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
794
795         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
796         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
797         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
798
799         LOG.info("testRemoveServerWithNoLeader ending");
800     }
801
802     @Test
803     public void testRemoveServerNonExistentServer() {
804         LOG.info("testRemoveServerNonExistentServer starting");
805
806         RaftActorContext initialActorContext = new MockRaftActorContext();
807
808         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
809                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
810                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
811                 actorFactory.generateActorId(LEADER_ID));
812
813         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
814         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
815         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
816
817         LOG.info("testRemoveServerNonExistentServer ending");
818     }
819
820     @Test
821     public void testRemoveServerForwardToLeader() {
822         LOG.info("testRemoveServerForwardToLeader starting");
823
824         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
825         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
826
827         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
828                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
829                 actorFactory.generateActorId(LEADER_ID));
830
831         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
832                 MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
833                         leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
834                         props().withDispatcher(Dispatchers.DefaultDispatcherId()),
835                 actorFactory.generateActorId(FOLLOWER_ID));
836         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
837
838         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
839                 -1, -1, (short)0), leaderActor);
840
841         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
842         expectFirstMatching(leaderActor, RemoveServer.class);
843
844         LOG.info("testRemoveServerForwardToLeader ending");
845     }
846
847     @Test
848     public void testRemoveServer() {
849         LOG.info("testRemoveServer starting");
850
851         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
852         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
853         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
854
855         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
856         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
857         RaftActorContext initialActorContext = new MockRaftActorContext();
858
859         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
860                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
861                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
862                 actorFactory.generateActorId(LEADER_ID));
863
864         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
865
866         TestActorRef<MessageCollectorActor> collector =
867                 actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
868                         actorFactory.generateActorId("collector"));
869         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
870                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
871                         configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
872                 followerActorId);
873
874         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
875         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
876         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
877
878         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
879         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
880         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
881
882         RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
883         assertTrue("Expected Leader", currentBehavior instanceof Leader);
884         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
885
886         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
887
888         LOG.info("testRemoveServer ending");
889     }
890
891     @Test
892     public void testRemoveServerLeader() {
893         LOG.info("testRemoveServerLeader starting");
894
895         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
896         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
897         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
898
899         final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
900         final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
901         RaftActorContext initialActorContext = new MockRaftActorContext();
902
903         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
904                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
905                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
906                 actorFactory.generateActorId(LEADER_ID));
907
908         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
909
910         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
911                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
912         actorFactory.createTestActor(
913                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
914                         configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
915                 followerActorId);
916
917         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
918         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
919         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
920
921         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(followerCollector, ApplyState.class);
922         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
923         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
924                 votingServer(FOLLOWER_ID));
925
926         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
927
928         LOG.info("testRemoveServerLeader ending");
929     }
930
931     @Test
932     public void testRemoveServerLeaderWithNoFollowers() {
933         LOG.info("testRemoveServerLeaderWithNoFollowers starting");
934
935         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
936                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
937                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
938                 actorFactory.generateActorId(LEADER_ID));
939
940         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
941         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
942         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
943
944         LOG.info("testRemoveServerLeaderWithNoFollowers ending");
945     }
946
947     @Test
948     public void testChangeServersVotingStatus() {
949         LOG.info("testChangeServersVotingStatus starting");
950
951         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
952         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
953         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
954
955         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
956         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
957         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
958         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
959
960         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
961                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
962                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
963                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
964         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
965
966         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
967                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
968                 actorFactory.generateActorId("collector"));
969         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
970                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
971                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
972                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
973
974         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
975                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
976                 actorFactory.generateActorId("collector"));
977         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
978                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
979                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
980                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
981
982         // Send first ChangeServersVotingStatus message
983
984         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, false, FOLLOWER_ID2, false)),
985                 testKit.getRef());
986         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
987         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
988
989         final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
990         assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
991         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
992                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
993
994         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
995         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
996                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
997
998         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
999         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1000                 votingServer(LEADER_ID), nonVotingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1001
1002         MessageCollectorActor.clearMessages(leaderCollector);
1003         MessageCollectorActor.clearMessages(follower1Collector);
1004         MessageCollectorActor.clearMessages(follower2Collector);
1005
1006         // Send second ChangeServersVotingStatus message
1007
1008         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(FOLLOWER_ID, true)), testKit.getRef());
1009         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1010         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1011
1012         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1013         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1014                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1015
1016         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1017         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1018                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1019
1020         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1021         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1022                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
1023
1024         LOG.info("testChangeServersVotingStatus ending");
1025     }
1026
1027     @Test
1028     public void testChangeLeaderToNonVoting() {
1029         LOG.info("testChangeLeaderToNonVoting starting");
1030
1031         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1032         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
1033
1034         final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
1035         final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
1036         final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
1037         final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
1038
1039         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1040                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, follower1ActorPath,
1041                         FOLLOWER_ID2, follower2ActorPath), new MockRaftActorContext()).
1042                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1043         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
1044
1045         TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
1046                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047                 actorFactory.generateActorId("collector"));
1048         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
1049                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1050                         FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
1051                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
1052
1053         TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
1054                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1055                 actorFactory.generateActorId("collector"));
1056         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
1057                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
1058                         FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
1059                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
1060
1061         // Send ChangeServersVotingStatus message
1062
1063         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1064         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1065         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1066
1067         MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
1068         verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1069                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1070
1071         MessageCollectorActor.expectFirstMatching(follower1Collector, ApplyState.class);
1072         verifyServerConfigurationPayloadEntry(follower1RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1073                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1074
1075         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
1076         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
1077                 nonVotingServer(LEADER_ID), votingServer(FOLLOWER_ID), votingServer(FOLLOWER_ID2));
1078
1079         verifyRaftState(RaftState.Leader, follower1RaftActor.underlyingActor(), follower2RaftActor.underlyingActor());
1080         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
1081
1082         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
1083
1084         LOG.info("testChangeLeaderToNonVoting ending");
1085     }
1086
1087     @Test
1088     public void testChangeLeaderToNonVotingInSingleNode() {
1089         LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
1090
1091         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
1092                 MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
1093                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
1094
1095         leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
1096         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1097         assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
1098
1099         LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
1100     }
1101
1102     @Test
1103     public void testChangeToVotingWithNoLeader() {
1104         LOG.info("testChangeToVotingWithNoLeader starting");
1105
1106         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1107         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1108
1109         final String node1ID = "node1";
1110         final String node2ID = "node2";
1111
1112         // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
1113         // via the server config. The server config will also contain 2 voting peers that are down (ie no
1114         // actors created).
1115
1116         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1117                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
1118                 new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
1119         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1120
1121         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
1122         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1123         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
1124         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1125
1126         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1127                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1128                 actorFactory.generateActorId("collector"));
1129         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1130                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1131                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1132         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1133
1134         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1135                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1136                 actorFactory.generateActorId("collector"));
1137         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1138                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1139                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1140         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1141
1142         node1RaftActor.waitForInitializeBehaviorComplete();
1143         node2RaftActor.waitForInitializeBehaviorComplete();
1144
1145         // Verify the intended server config was loaded and applied.
1146         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1147                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1148                 votingServer("downNode2"));
1149         assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
1150         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1151         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
1152
1153         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1154                 nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
1155                 votingServer("downNode2"));
1156         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
1157
1158         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
1159         // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
1160         // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
1161         // down nodes are switched to non-voting, node1 should only need a vote from node2.
1162
1163         // First send the message such that node1 has no peer address for node2 - should fail.
1164
1165         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1166                 node2ID, true, "downNode1", false, "downNode2", false));
1167         node1RaftActorRef.tell(changeServers, testKit.getRef());
1168         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1169         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1170
1171         // Update node2's peer address and send the message again
1172
1173         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
1174
1175         node1RaftActorRef.tell(changeServers, testKit.getRef());
1176         reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1177         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1178
1179         ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1180         assertEquals("getToIndex", 1, apply.getToIndex());
1181         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1182                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1183                 nonVotingServer("downNode2"));
1184         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1185         assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
1186
1187         apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1188         assertEquals("getToIndex", 1, apply.getToIndex());
1189         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1190                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
1191                 nonVotingServer("downNode2"));
1192         assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
1193         assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
1194
1195         LOG.info("testChangeToVotingWithNoLeader ending");
1196     }
1197
1198     @Test
1199     public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
1200         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
1201
1202         final String node1ID = "node1";
1203         final String node2ID = "node2";
1204
1205         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1206             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1207
1208         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1209                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1210         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1211
1212         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1213         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1214         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1215         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1216
1217         DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
1218         configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1219         configParams1.setElectionTimeoutFactor(1);
1220         configParams1.setPeerAddressResolver(peerAddressResolver);
1221         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1222                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1223                 actorFactory.generateActorId("collector"));
1224         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1225                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
1226                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1227         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1228
1229         DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
1230         configParams2.setElectionTimeoutFactor(1000000);
1231         configParams2.setPeerAddressResolver(peerAddressResolver);
1232         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1233                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1234                 actorFactory.generateActorId("collector"));
1235         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1236                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
1237                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1238         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1239
1240         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1241         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1242         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
1243         // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
1244         // node2 was previously voting.
1245
1246         node2RaftActor.setDropMessageOfType(RequestVote.class);
1247
1248         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
1249         node1RaftActorRef.tell(changeServers, testKit.getRef());
1250         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1251         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
1252
1253         assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
1254                 Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
1255         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1256
1257         LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
1258     }
1259
1260     @Test
1261     public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
1262         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
1263
1264         final String node1ID = "node1";
1265         final String node2ID = "node2";
1266
1267         PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1268             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
1269
1270         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1271         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1272         configParams.setElectionTimeoutFactor(3);
1273         configParams.setPeerAddressResolver(peerAddressResolver);
1274
1275         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1276                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
1277         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1278
1279         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1280         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1281         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1282         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1283         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
1284                 new MockRaftActorContext.MockPayload("2")));
1285
1286         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1287                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1288                 actorFactory.generateActorId("collector"));
1289         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1290                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1291                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1292         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1293
1294         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1295                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1296                 actorFactory.generateActorId("collector"));
1297         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1298                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1299                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1300         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1301
1302         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
1303         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
1304         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
1305         // forward the request to node2.
1306
1307         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
1308                 ImmutableMap.of(node1ID, true, node2ID, true));
1309         node1RaftActorRef.tell(changeServers, testKit.getRef());
1310         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1311         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1312
1313         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1314         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1315                 votingServer(node1ID), votingServer(node2ID));
1316         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1317
1318         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1319         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1320                 votingServer(node1ID), votingServer(node2ID));
1321         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1322         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1323
1324         LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
1325     }
1326
1327     @Test
1328     public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
1329         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
1330
1331         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1332         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1333         configParams.setElectionTimeoutFactor(100000);
1334
1335         final String node1ID = "node1";
1336         final String node2ID = "node2";
1337
1338         configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
1339             peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
1340
1341         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1342                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
1343         ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
1344
1345         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
1346         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
1347         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
1348         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
1349
1350         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
1351                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1352                 actorFactory.generateActorId("collector"));
1353         TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
1354                 CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
1355                         PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
1356         CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
1357
1358         TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
1359                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1360                 actorFactory.generateActorId("collector"));
1361         TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
1362                 CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
1363                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
1364         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
1365
1366         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
1367         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
1368         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
1369         // request to node2 when node2 is elected.
1370
1371         node2RaftActor.setDropMessageOfType(RequestVote.class);
1372
1373         ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
1374                 node2ID, true));
1375         node1RaftActorRef.tell(changeServers, testKit.getRef());
1376
1377         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
1378
1379         node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
1380
1381         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
1382         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
1383
1384         MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
1385         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
1386                 votingServer(node1ID), votingServer(node2ID));
1387         assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
1388         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
1389
1390         MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
1391         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
1392                 votingServer(node1ID), votingServer(node2ID));
1393         assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
1394
1395         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
1396     }
1397
1398     private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
1399         Stopwatch sw = Stopwatch.createStarted();
1400         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
1401             for(RaftActor raftActor: raftActors) {
1402                 if(raftActor.getRaftState() == expState) {
1403                     return;
1404                 }
1405             }
1406         }
1407
1408         fail("None of the RaftActors have state " + expState);
1409     }
1410
1411     private static ServerInfo votingServer(String id) {
1412         return new ServerInfo(id, true);
1413     }
1414
1415     private static ServerInfo nonVotingServer(String id) {
1416         return new ServerInfo(id, false);
1417     }
1418
1419     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
1420         return newCollectorActor(leaderRaftActor, LEADER_ID);
1421     }
1422
1423     private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
1424         TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
1425                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1426                 actorFactory.generateActorId(id + "Collector"));
1427         raftActor.setCollectorActor(collectorActor);
1428         return collectorActor;
1429     }
1430
1431     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
1432         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
1433         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
1434         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
1435         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
1436     }
1437
1438     private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
1439         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1440         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1441         configParams.setElectionTimeoutFactor(100000);
1442         NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
1443         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
1444         termInfo.update(1, LEADER_ID);
1445         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
1446                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
1447     }
1448
1449     static abstract class AbstractMockRaftActor extends MockRaftActor {
1450         private volatile TestActorRef<MessageCollectorActor> collectorActor;
1451         private volatile Class<?> dropMessageOfType;
1452
1453         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1454                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1455             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
1456                     persistent(Optional.of(persistent)));
1457             this.collectorActor = collectorActor;
1458         }
1459
1460         void setDropMessageOfType(Class<?> dropMessageOfType) {
1461             this.dropMessageOfType = dropMessageOfType;
1462         }
1463
1464         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
1465             this.collectorActor = collectorActor;
1466         }
1467
1468         @Override
1469         public void handleCommand(Object message) {
1470             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
1471                 super.handleCommand(message);
1472             }
1473
1474             if(collectorActor != null) {
1475                 collectorActor.tell(message, getSender());
1476             }
1477         }
1478     }
1479
1480     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
1481
1482         CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
1483                 boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
1484             super(id, peerAddresses, config, persistent, collectorActor);
1485             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
1486                 @Override
1487                 public void createSnapshot(ActorRef actorRef) {
1488                     actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
1489                 }
1490
1491                 @Override
1492                 public void applySnapshot(byte[] snapshotBytes) {
1493                 }
1494             };
1495         }
1496
1497         public static Props props(final String id, final Map<String, String> peerAddresses,
1498                 ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
1499
1500             return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
1501                     persistent, collectorActor);
1502         }
1503
1504     }
1505
1506     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
1507         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
1508                 RaftActorContext fromContext) {
1509             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
1510             setPersistence(false);
1511
1512             RaftActorContext context = getRaftActorContext();
1513             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
1514                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
1515                 getState().add(entry.getData());
1516                 context.getReplicatedLog().append(entry);
1517             }
1518
1519             context.setCommitIndex(fromContext.getCommitIndex());
1520             context.setLastApplied(fromContext.getLastApplied());
1521             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
1522                     fromContext.getTermInformation().getVotedFor());
1523         }
1524
1525         @Override
1526         protected void initializeBehavior() {
1527             changeCurrentBehavior(new Leader(getRaftActorContext()));
1528             initializeBehaviorComplete.countDown();
1529         }
1530
1531         @Override
1532         public void createSnapshot(ActorRef actorRef) {
1533             try {
1534                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
1535             } catch (Exception e) {
1536                 LOG.error("createSnapshot failed", e);
1537             }
1538         }
1539
1540         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
1541             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1542             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
1543             configParams.setElectionTimeoutFactor(10);
1544             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
1545         }
1546     }
1547
1548     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
1549         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1550             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
1551             setPersistence(false);
1552         }
1553
1554         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
1555             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
1556         }
1557     }
1558 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.