Always persist ServerConfigurationPayload log entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.base.Optional;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
35 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
38 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
39 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
40 import org.opendaylight.controller.cluster.raft.messages.AddServer;
41 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
46 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
48 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
51 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.duration.FiniteDuration;
55
56 /**
57  * Unit tests for RaftActorServerConfigurationSupport.
58  *
59  * @author Thomas Pantelis
60  */
61 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
62     static final String LEADER_ID = "leader";
63     static final String FOLLOWER_ID = "follower";
64     static final String NEW_SERVER_ID = "new-server";
65     static final String NEW_SERVER_ID2 = "new-server2";
66     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
67     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
68
69     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70
71     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
72             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
73             actorFactory.generateActorId(FOLLOWER_ID));
74
75     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
76     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
77     private RaftActorContext newFollowerActorContext;
78
79     private final JavaTestKit testKit = new JavaTestKit(getSystem());
80
81     @Before
82     public void setup() {
83         InMemoryJournal.clear();
84         InMemorySnapshotStore.clear();
85
86         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
87
88         newFollowerCollectorActor = actorFactory.createTestActor(
89                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
90                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
91         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
92                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
93                 actorFactory.generateActorId(NEW_SERVER_ID));
94
95         try {
96             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
97         } catch (Exception e) {
98             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
99         }
100     }
101
102     private static DefaultConfigParamsImpl newFollowerConfigParams() {
103         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
104         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
105         configParams.setElectionTimeoutFactor(100000);
106         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
107         return configParams;
108     }
109
110     @After
111     public void tearDown() throws Exception {
112         actorFactory.close();
113     }
114
115     @Test
116     public void testAddServerWithExistingFollower() throws Exception {
117         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
118         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
119                 0, 3, 1).build());
120         followerActorContext.setCommitIndex(2);
121         followerActorContext.setLastApplied(2);
122
123         Follower follower = new Follower(followerActorContext);
124         followerActor.underlyingActor().setBehavior(follower);
125
126         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
127                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
128                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
129                 actorFactory.generateActorId(LEADER_ID));
130
131         // Expect initial heartbeat from the leader.
132         expectFirstMatching(followerActor, AppendEntries.class);
133         clearMessages(followerActor);
134
135         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
136
137         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
138
139         // Leader should install snapshot - capture and verify ApplySnapshot contents
140
141         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
142         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
143         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
144
145         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
146         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
147         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
148
149         // Verify ServerConfigurationPayload entry in leader's log
150
151         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
152         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
153         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
154         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
155         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
156                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
157
158         // Verify ServerConfigurationPayload entry in both followers
159
160         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
161         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
162                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
163
164         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
165         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
166                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
167
168         // Verify new server config was applied in both followers
169
170         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
171
172         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
173
174         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
175         expectFirstMatching(followerActor, ApplyState.class);
176
177         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
178         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
179         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
180         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
181
182         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
183         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
184         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
185         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
186         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
187         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
188
189         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
190         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
191         logEntry = persistedLogEntries.get(0);
192         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
193         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
194         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
195                 logEntry.getData().getClass());
196     }
197
198     @Test
199     public void testAddServerWithNoExistingFollower() throws Exception {
200         RaftActorContext initialActorContext = new MockRaftActorContext();
201         initialActorContext.setCommitIndex(1);
202         initialActorContext.setLastApplied(1);
203         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
204                 0, 2, 1).build());
205
206         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
207                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
208                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
209                 actorFactory.generateActorId(LEADER_ID));
210
211         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
212         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
213
214         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
215
216         // Leader should install snapshot - capture and verify ApplySnapshot contents
217
218         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
219         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
220         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
221
222         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
223         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
224         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
225
226         // Verify ServerConfigurationPayload entry in leader's log
227
228         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
229         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
230         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
231         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
232                 votingServer(NEW_SERVER_ID));
233
234         // Verify ServerConfigurationPayload entry in the new follower
235
236         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
237         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
238         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
239                 votingServer(NEW_SERVER_ID));
240
241         // Verify new server config was applied in the new follower
242
243         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
244     }
245
246     @Test
247     public void testAddServersAsNonVoting() throws Exception {
248         RaftActorContext initialActorContext = new MockRaftActorContext();
249         initialActorContext.setCommitIndex(-1);
250         initialActorContext.setLastApplied(-1);
251         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
252
253         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
254                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
255                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
256                 actorFactory.generateActorId(LEADER_ID));
257
258         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
259         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
260
261         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
262
263         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
264         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
265         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
266
267         // Verify ServerConfigurationPayload entry in leader's log
268
269         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
270         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
271         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
272         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
273                 nonVotingServer(NEW_SERVER_ID));
274
275         // Verify ServerConfigurationPayload entry in the new follower
276
277         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
278         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
279         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
280                 nonVotingServer(NEW_SERVER_ID));
281
282         // Verify new server config was applied in the new follower
283
284         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
285
286         MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
287
288         // Add another non-voting server.
289
290         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
291         Follower newFollower2 = new Follower(follower2ActorContext);
292         followerActor.underlyingActor().setBehavior(newFollower2);
293
294         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
295
296         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
297         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
298         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
299
300         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
301         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
302         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
303         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
304                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
305     }
306
307     @Test
308     public void testAddServerWithOperationInProgress() throws Exception {
309         RaftActorContext initialActorContext = new MockRaftActorContext();
310         initialActorContext.setCommitIndex(-1);
311         initialActorContext.setLastApplied(-1);
312         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
313
314         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
315                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
316                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
317                 actorFactory.generateActorId(LEADER_ID));
318
319         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
320         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
321
322         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
323         Follower newFollower2 = new Follower(follower2ActorContext);
324         followerActor.underlyingActor().setBehavior(newFollower2);
325
326         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
327         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
328
329         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
330
331         // Wait for leader's install snapshot and capture it
332
333         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
334
335         // Send a second AddServer - should get queued
336         JavaTestKit testKit2 = new JavaTestKit(getSystem());
337         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
338
339         // Continue the first AddServer
340         newFollowerRaftActorInstance.setDropMessageOfType(null);
341         newFollowerRaftActor.tell(installSnapshot, leaderActor);
342
343         // Verify both complete successfully
344         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
345         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
346
347         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
348         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
349
350         // Verify ServerConfigurationPayload entries in leader's log
351
352         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
353         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
354         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
355         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
356                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
357
358         // Verify ServerConfigurationPayload entry in the new follower
359
360         MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
361
362         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
363                newFollowerActorContext.getPeerIds());
364     }
365
366     @Test
367     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
368         RaftActorContext initialActorContext = new MockRaftActorContext();
369         initialActorContext.setCommitIndex(-1);
370         initialActorContext.setLastApplied(-1);
371         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
372
373         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
374                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
375                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
376                 actorFactory.generateActorId(LEADER_ID));
377
378         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
379         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
380
381         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
382                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
383                 actorFactory.generateActorId(LEADER_ID + "Collector"));
384         leaderRaftActor.setCollectorActor(leaderCollectorActor);
385
386         // Drop commit message for now to delay snapshot completion
387         leaderRaftActor.setDropMessageOfType(String.class);
388
389         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
390
391         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
392
393         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
394
395         leaderRaftActor.setDropMessageOfType(null);
396         leaderActor.tell(commitMsg, leaderActor);
397
398         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
399         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
400         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
401
402         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
403
404         // Verify ServerConfigurationPayload entry in leader's log
405
406         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
407         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
408         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
409         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
410                 votingServer(NEW_SERVER_ID));
411     }
412
413     @Test
414     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
415         RaftActorContext initialActorContext = new MockRaftActorContext();
416         initialActorContext.setCommitIndex(-1);
417         initialActorContext.setLastApplied(-1);
418         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
419
420         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
421                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
422                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
423                 actorFactory.generateActorId(LEADER_ID));
424
425         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
426         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
427
428         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
429
430         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
431                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
432                 actorFactory.generateActorId(LEADER_ID + "Collector"));
433         leaderRaftActor.setCollectorActor(leaderCollectorActor);
434
435         // Drop commit message so the snapshot doesn't complete.
436         leaderRaftActor.setDropMessageOfType(String.class);
437
438         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
439
440         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
441
442         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
443         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
444
445         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
446     }
447
448     @Test
449     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
450         RaftActorContext initialActorContext = new MockRaftActorContext();
451         initialActorContext.setCommitIndex(-1);
452         initialActorContext.setLastApplied(-1);
453         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
454
455         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
456                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
457                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
458                 actorFactory.generateActorId(LEADER_ID));
459
460         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
461         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
462         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
463
464         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
465                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
466                 actorFactory.generateActorId(LEADER_ID + "Collector"));
467         leaderRaftActor.setCollectorActor(leaderCollectorActor);
468
469         // Drop the commit message so the snapshot doesn't complete yet.
470         leaderRaftActor.setDropMessageOfType(String.class);
471
472         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
473
474         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
475
476         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
477
478         // Change the leader behavior to follower
479         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
480
481         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
482         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
483         // isCapturing assertion below.
484         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
485
486         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
487         leaderActor.tell(commitMsg, leaderActor);
488
489         leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
490
491         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
492         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
493
494         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
495         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
496     }
497
498     @Test
499     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
500         RaftActorContext initialActorContext = new MockRaftActorContext();
501         initialActorContext.setCommitIndex(-1);
502         initialActorContext.setLastApplied(-1);
503         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
504
505         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
506                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
507                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
508                 actorFactory.generateActorId(LEADER_ID));
509
510         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
511         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
512
513         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
514
515         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
516                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
517                 actorFactory.generateActorId(LEADER_ID + "Collector"));
518         leaderRaftActor.setCollectorActor(leaderCollectorActor);
519
520         // Drop the UnInitializedFollowerSnapshotReply to delay it.
521         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
522
523         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
524
525         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
526                 UnInitializedFollowerSnapshotReply.class);
527
528         // Prevent election timeout when the leader switches to follower
529         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
530
531         // Change the leader behavior to follower
532         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
533
534         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
535         leaderRaftActor.setDropMessageOfType(null);
536         leaderActor.tell(snapshotReply, leaderActor);
537
538         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
539         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
540
541         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
542     }
543
544     @Test
545     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
546         RaftActorContext initialActorContext = new MockRaftActorContext();
547         initialActorContext.setCommitIndex(-1);
548         initialActorContext.setLastApplied(-1);
549         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
550
551         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
552                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
553                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
554                 actorFactory.generateActorId(LEADER_ID));
555
556         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
557         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
558         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
559
560         // Drop the InstallSnapshot message so it times out
561         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
562
563         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
564
565         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
566
567         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
568         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
569
570         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
571         assertEquals("Leader followers size", 0,
572                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
573     }
574
575     @Test
576     public void testAddServerWithNoLeader() {
577         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
578         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
579
580         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
581                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
582                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
583                 actorFactory.generateActorId(LEADER_ID));
584         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
585
586         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
587         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
588         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
589     }
590
591     @Test
592     public void testAddServerForwardedToLeader() {
593         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
594         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
595
596         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
597                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
598                 actorFactory.generateActorId(LEADER_ID));
599
600         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
601                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
602                         Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
603                 actorFactory.generateActorId(FOLLOWER_ID));
604         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
605
606         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
607                 -1, -1, (short)0), leaderActor);
608
609         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
610         expectFirstMatching(leaderActor, AddServer.class);
611     }
612
613     private ServerInfo votingServer(String id) {
614         return new ServerInfo(id, true);
615     }
616
617     private ServerInfo nonVotingServer(String id) {
618         return new ServerInfo(id, false);
619     }
620
621     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
622         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
623         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
624         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
625         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
626     }
627
628     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
629         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
630         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
631         configParams.setElectionTimeoutFactor(100000);
632         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
633         termInfo.update(1, LEADER_ID);
634         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
635                 id, termInfo, -1, -1,
636                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
637         followerActorContext.setCommitIndex(-1);
638         followerActorContext.setLastApplied(-1);
639         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
640
641         return followerActorContext;
642     }
643
644     static abstract class AbstractMockRaftActor extends MockRaftActor {
645         private volatile TestActorRef<MessageCollectorActor> collectorActor;
646         private volatile Class<?> dropMessageOfType;
647
648         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
649                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
650             super(id, peerAddresses, config, dataPersistenceProvider);
651             this.collectorActor = collectorActor;
652         }
653
654         void setDropMessageOfType(Class<?> dropMessageOfType) {
655             this.dropMessageOfType = dropMessageOfType;
656         }
657
658         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
659             this.collectorActor = collectorActor;
660         }
661
662         @Override
663         public void handleCommand(Object message) {
664             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
665                 super.handleCommand(message);
666             }
667
668             if(collectorActor != null) {
669                 collectorActor.tell(message, getSender());
670             }
671         }
672     }
673
674     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
675         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
676                 RaftActorContext fromContext) {
677             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
678             setPersistence(false);
679
680             RaftActorContext context = getRaftActorContext();
681             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
682                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
683                 getState().add(entry.getData());
684                 context.getReplicatedLog().append(entry);
685             }
686
687             context.setCommitIndex(fromContext.getCommitIndex());
688             context.setLastApplied(fromContext.getLastApplied());
689             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
690                     fromContext.getTermInformation().getVotedFor());
691         }
692
693         @Override
694         protected void initializeBehavior() {
695             changeCurrentBehavior(new Leader(getRaftActorContext()));
696             initializeBehaviorComplete.countDown();
697         }
698
699         @Override
700         public void createSnapshot(ActorRef actorRef) {
701             try {
702                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
703             } catch (Exception e) {
704                 LOG.error("createSnapshot failed", e);
705             }
706         }
707
708         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
709             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
710             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
711             configParams.setElectionTimeoutFactor(10);
712             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
713         }
714     }
715
716     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
717         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
718             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
719             setPersistence(false);
720         }
721
722         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
723             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
724         }
725     }
726 }