Add wait state for AddServer if snapshot in progress
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
38 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
39 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
40 import org.opendaylight.controller.cluster.raft.messages.AddServer;
41 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
46 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
48 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
51 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.duration.FiniteDuration;
55
56 /**
57  * Unit tests for RaftActorServerConfigurationSupport.
58  *
59  * @author Thomas Pantelis
60  */
61 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
62     static final String LEADER_ID = "leader";
63     static final String FOLLOWER_ID = "follower";
64     static final String NEW_SERVER_ID = "new-server";
65     static final String NEW_SERVER_ID2 = "new-server2";
66     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
67     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
68
69     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70
71     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
72             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
73             actorFactory.generateActorId(FOLLOWER_ID));
74
75     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
76     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
77     private RaftActorContext newFollowerActorContext;
78
79     private final JavaTestKit testKit = new JavaTestKit(getSystem());
80
81     @Before
82     public void setup() {
83         InMemoryJournal.clear();
84         InMemorySnapshotStore.clear();
85
86         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
87
88         newFollowerCollectorActor = actorFactory.createTestActor(
89                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
90                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
91         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
92                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
93                 actorFactory.generateActorId(NEW_SERVER_ID));
94
95         try {
96             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
97         } catch (Exception e) {
98             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
99         }
100     }
101
102     private static DefaultConfigParamsImpl newFollowerConfigParams() {
103         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
104         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
105         configParams.setElectionTimeoutFactor(100000);
106         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
107         return configParams;
108     }
109
110     @After
111     public void tearDown() throws Exception {
112         actorFactory.close();
113     }
114
115     @Test
116     public void testAddServerWithExistingFollower() throws Exception {
117         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
118         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
119                 0, 3, 1).build());
120         followerActorContext.setCommitIndex(2);
121         followerActorContext.setLastApplied(2);
122
123         Follower follower = new Follower(followerActorContext);
124         followerActor.underlyingActor().setBehavior(follower);
125
126         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
127                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
128                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
129                 actorFactory.generateActorId(LEADER_ID));
130
131         // Expect initial heartbeat from the leader.
132         expectFirstMatching(followerActor, AppendEntries.class);
133         clearMessages(followerActor);
134
135         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
136
137         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
138
139         // Leader should install snapshot - capture and verify ApplySnapshot contents
140
141         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
142         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
143         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
144
145         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
146         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
147         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
148
149         // Verify ServerConfigurationPayload entry in leader's log
150
151         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
152         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
153         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
154         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
155         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
156                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
157
158         // Verify ServerConfigurationPayload entry in both followers
159
160         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
161         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
162                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
163
164         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
165         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
166                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
167
168         // Verify new server config was applied in both followers
169
170         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
171
172         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
173
174         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
175         expectFirstMatching(followerActor, ApplyState.class);
176
177         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
178         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
179         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
180         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
181     }
182
183     @Test
184     public void testAddServerWithNoExistingFollower() throws Exception {
185         RaftActorContext initialActorContext = new MockRaftActorContext();
186         initialActorContext.setCommitIndex(1);
187         initialActorContext.setLastApplied(1);
188         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
189                 0, 2, 1).build());
190
191         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
192                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
193                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
194                 actorFactory.generateActorId(LEADER_ID));
195
196         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
197         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
198
199         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
200
201         // Leader should install snapshot - capture and verify ApplySnapshot contents
202
203         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
204         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
205         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
206
207         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
208         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
209         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
210
211         // Verify ServerConfigurationPayload entry in leader's log
212
213         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
214         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
215         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
216         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
217                 votingServer(NEW_SERVER_ID));
218
219         // Verify ServerConfigurationPayload entry in the new follower
220
221         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
222         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
223         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
224                 votingServer(NEW_SERVER_ID));
225
226         // Verify new server config was applied in the new follower
227
228         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
229     }
230
231     @Test
232     public void testAddServersAsNonVoting() throws Exception {
233         RaftActorContext initialActorContext = new MockRaftActorContext();
234         initialActorContext.setCommitIndex(-1);
235         initialActorContext.setLastApplied(-1);
236         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
237
238         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
239                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
240                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
241                 actorFactory.generateActorId(LEADER_ID));
242
243         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
244         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
245
246         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
247
248         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
249         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
250         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
251
252         // Verify ServerConfigurationPayload entry in leader's log
253
254         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
255         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
256         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
257         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
258                 nonVotingServer(NEW_SERVER_ID));
259
260         // Verify ServerConfigurationPayload entry in the new follower
261
262         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
263         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
264         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
265                 nonVotingServer(NEW_SERVER_ID));
266
267         // Verify new server config was applied in the new follower
268
269         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
270
271         MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
272
273         // Add another non-voting server.
274
275         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
276         Follower newFollower2 = new Follower(follower2ActorContext);
277         followerActor.underlyingActor().setBehavior(newFollower2);
278
279         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
280
281         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
282         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
283         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
284
285         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
286         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
287         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
288         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
289                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
290     }
291
292     @Test
293     public void testAddServerWithOperationInProgress() throws Exception {
294         RaftActorContext initialActorContext = new MockRaftActorContext();
295         initialActorContext.setCommitIndex(-1);
296         initialActorContext.setLastApplied(-1);
297         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
298
299         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
300                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
301                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
302                 actorFactory.generateActorId(LEADER_ID));
303
304         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
305         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
306
307         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
308         Follower newFollower2 = new Follower(follower2ActorContext);
309         followerActor.underlyingActor().setBehavior(newFollower2);
310
311         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
312         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
313
314         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
315
316         // Wait for leader's install snapshot and capture it
317
318         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
319
320         // Send a second AddServer - should get queued
321         JavaTestKit testKit2 = new JavaTestKit(getSystem());
322         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
323
324         // Continue the first AddServer
325         newFollowerRaftActorInstance.setDropMessageOfType(null);
326         newFollowerRaftActor.tell(installSnapshot, leaderActor);
327
328         // Verify both complete successfully
329         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
330         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
331
332         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
333         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
334
335         // Verify ServerConfigurationPayload entries in leader's log
336
337         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
338         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
339         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
340         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
341                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
342
343         // Verify ServerConfigurationPayload entry in the new follower
344
345         MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
346
347         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
348                newFollowerActorContext.getPeerIds());
349     }
350
351     @Test
352     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
353         RaftActorContext initialActorContext = new MockRaftActorContext();
354         initialActorContext.setCommitIndex(-1);
355         initialActorContext.setLastApplied(-1);
356         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
357
358         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
359                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
360                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
361                 actorFactory.generateActorId(LEADER_ID));
362
363         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
364         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
365
366         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
367                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
368                 actorFactory.generateActorId(LEADER_ID + "Collector"));
369         leaderRaftActor.setCollectorActor(leaderCollectorActor);
370
371         // Drop commit message for now to delay snapshot completion
372         leaderRaftActor.setDropMessageOfType(String.class);
373
374         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
375
376         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
377
378         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
379
380         leaderRaftActor.setDropMessageOfType(null);
381         leaderActor.tell(commitMsg, leaderActor);
382
383         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
384         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
385         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
386
387         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
388
389         // Verify ServerConfigurationPayload entry in leader's log
390
391         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
392         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
393         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
394         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
395                 votingServer(NEW_SERVER_ID));
396     }
397
398     @Test
399     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
400         RaftActorContext initialActorContext = new MockRaftActorContext();
401         initialActorContext.setCommitIndex(-1);
402         initialActorContext.setLastApplied(-1);
403         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
404
405         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
406                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
407                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
408                 actorFactory.generateActorId(LEADER_ID));
409
410         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
411         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
412
413         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
414
415         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
416                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
417                 actorFactory.generateActorId(LEADER_ID + "Collector"));
418         leaderRaftActor.setCollectorActor(leaderCollectorActor);
419
420         // Drop commit message so the snapshot doesn't complete.
421         leaderRaftActor.setDropMessageOfType(String.class);
422
423         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
424
425         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
426
427         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
428         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
429
430         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
431     }
432
433     @Test
434     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
435         RaftActorContext initialActorContext = new MockRaftActorContext();
436         initialActorContext.setCommitIndex(-1);
437         initialActorContext.setLastApplied(-1);
438         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
439
440         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
441                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
442                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
443                 actorFactory.generateActorId(LEADER_ID));
444
445         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
446         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
447         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
448
449         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
450                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
451                 actorFactory.generateActorId(LEADER_ID + "Collector"));
452         leaderRaftActor.setCollectorActor(leaderCollectorActor);
453
454         // Drop the commit message so the snapshot doesn't complete yet.
455         leaderRaftActor.setDropMessageOfType(String.class);
456
457         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
458
459         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
460
461         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
462
463         // Change the leader behavior to follower
464         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
465
466         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
467         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
468         // isCapturing assertion below.
469         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
470
471         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
472         leaderActor.tell(commitMsg, leaderActor);
473
474         leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
475
476         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
477         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
478
479         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
480         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
481     }
482
483     @Test
484     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
485         RaftActorContext initialActorContext = new MockRaftActorContext();
486         initialActorContext.setCommitIndex(-1);
487         initialActorContext.setLastApplied(-1);
488         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
489
490         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
491                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
492                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
493                 actorFactory.generateActorId(LEADER_ID));
494
495         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
496         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
497
498         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
499
500         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
501                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
502                 actorFactory.generateActorId(LEADER_ID + "Collector"));
503         leaderRaftActor.setCollectorActor(leaderCollectorActor);
504
505         // Drop the UnInitializedFollowerSnapshotReply to delay it.
506         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
507
508         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
509
510         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
511                 UnInitializedFollowerSnapshotReply.class);
512
513         // Prevent election timeout when the leader switches to follower
514         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
515
516         // Change the leader behavior to follower
517         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
518
519         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
520         leaderRaftActor.setDropMessageOfType(null);
521         leaderActor.tell(snapshotReply, leaderActor);
522
523         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
524         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
525
526         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
527     }
528
529     @Test
530     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
531         RaftActorContext initialActorContext = new MockRaftActorContext();
532         initialActorContext.setCommitIndex(-1);
533         initialActorContext.setLastApplied(-1);
534         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
535
536         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
537                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
538                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
539                 actorFactory.generateActorId(LEADER_ID));
540
541         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
542         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
543         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
544
545         // Drop the InstallSnapshot message so it times out
546         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
547
548         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
549
550         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
551
552         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
553         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
554
555         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
556         assertEquals("Leader followers size", 0,
557                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
558     }
559
560     @Test
561     public void testAddServerWithNoLeader() {
562         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
563         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
564
565         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
566                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
567                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
568                 actorFactory.generateActorId(LEADER_ID));
569         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
570
571         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
572         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
573         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
574     }
575
576     @Test
577     public void testAddServerForwardedToLeader() {
578         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
579         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
580
581         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
582                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
583                 actorFactory.generateActorId(LEADER_ID));
584
585         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
586                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
587                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
588                 actorFactory.generateActorId(FOLLOWER_ID));
589         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
590
591         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
592                 -1, -1, (short)0), leaderActor);
593
594         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
595         expectFirstMatching(leaderActor, AddServer.class);
596     }
597
598     private ServerInfo votingServer(String id) {
599         return new ServerInfo(id, true);
600     }
601
602     private ServerInfo nonVotingServer(String id) {
603         return new ServerInfo(id, false);
604     }
605
606     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
607         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
608         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
609         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
610         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
611     }
612
613     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
614         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
615         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
616         configParams.setElectionTimeoutFactor(100000);
617         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
618         termInfo.update(1, LEADER_ID);
619         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
620                 id, termInfo, -1, -1,
621                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
622         followerActorContext.setCommitIndex(-1);
623         followerActorContext.setLastApplied(-1);
624         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
625
626         return followerActorContext;
627     }
628
629     static abstract class AbstractMockRaftActor extends MockRaftActor {
630         private volatile TestActorRef<MessageCollectorActor> collectorActor;
631         private volatile Class<?> dropMessageOfType;
632
633         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
634                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
635             super(id, peerAddresses, config, dataPersistenceProvider);
636             this.collectorActor = collectorActor;
637         }
638
639         void setDropMessageOfType(Class<?> dropMessageOfType) {
640             this.dropMessageOfType = dropMessageOfType;
641         }
642
643         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
644             this.collectorActor = collectorActor;
645         }
646
647         @Override
648         public void handleCommand(Object message) {
649             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
650                 super.handleCommand(message);
651             }
652
653             if(collectorActor != null) {
654                 collectorActor.tell(message, getSender());
655             }
656         }
657     }
658
659     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
660         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
661                 RaftActorContext fromContext) {
662             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
663             setPersistence(false);
664
665             RaftActorContext context = getRaftActorContext();
666             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
667                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
668                 getState().add(entry.getData());
669                 context.getReplicatedLog().append(entry);
670             }
671
672             context.setCommitIndex(fromContext.getCommitIndex());
673             context.setLastApplied(fromContext.getLastApplied());
674             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
675                     fromContext.getTermInformation().getVotedFor());
676         }
677
678         @Override
679         protected void initializeBehavior() {
680             changeCurrentBehavior(new Leader(getRaftActorContext()));
681             initializeBehaviorComplete.countDown();
682         }
683
684         @Override
685         public void createSnapshot(ActorRef actorRef) {
686             try {
687                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
688             } catch (Exception e) {
689                 LOG.error("createSnapshot failed", e);
690             }
691         }
692
693         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
694             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
695             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
696             configParams.setElectionTimeoutFactor(10);
697             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
698         }
699     }
700
701     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
702         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
703             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
704         }
705
706         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
707             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
708         }
709     }
710 }