Bug 4564: Implement restore from snapshot in RaftActor
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
40 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
41 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
42 import org.opendaylight.controller.cluster.raft.messages.AddServer;
43 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
48 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
50 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
51 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
52 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.FiniteDuration;
57
58 /**
59  * Unit tests for RaftActorServerConfigurationSupport.
60  *
61  * @author Thomas Pantelis
62  */
63 public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
64     static final String LEADER_ID = "leader";
65     static final String FOLLOWER_ID = "follower";
66     static final String NEW_SERVER_ID = "new-server";
67     static final String NEW_SERVER_ID2 = "new-server2";
68     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
69     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
70
71     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72
73     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
74             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
75             actorFactory.generateActorId(FOLLOWER_ID));
76
77     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
78     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
79     private RaftActorContext newFollowerActorContext;
80
81     private final JavaTestKit testKit = new JavaTestKit(getSystem());
82
83     @Before
84     public void setup() {
85         InMemoryJournal.clear();
86         InMemorySnapshotStore.clear();
87
88         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
89
90         newFollowerCollectorActor = actorFactory.createTestActor(
91                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
92                 actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
93         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
94                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
95                 actorFactory.generateActorId(NEW_SERVER_ID));
96
97         try {
98             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
99         } catch (Exception e) {
100             newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
101         }
102     }
103
104     private static DefaultConfigParamsImpl newFollowerConfigParams() {
105         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
106         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
107         configParams.setElectionTimeoutFactor(100000);
108         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
109         return configParams;
110     }
111
112     @After
113     public void tearDown() throws Exception {
114         actorFactory.close();
115     }
116
117     @Test
118     public void testAddServerWithExistingFollower() throws Exception {
119         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
120         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
121                 0, 3, 1).build());
122         followerActorContext.setCommitIndex(2);
123         followerActorContext.setLastApplied(2);
124
125         Follower follower = new Follower(followerActorContext);
126         followerActor.underlyingActor().setBehavior(follower);
127
128         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
129                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
130                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
131                 actorFactory.generateActorId(LEADER_ID));
132
133         // Expect initial heartbeat from the leader.
134         expectFirstMatching(followerActor, AppendEntries.class);
135         clearMessages(followerActor);
136
137         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
138         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
139
140         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
141
142         // Leader should install snapshot - capture and verify ApplySnapshot contents
143
144         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
145         @SuppressWarnings("unchecked")
146         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
147         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
148
149         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
150         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
151         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
152
153         // Verify ServerConfigurationPayload entry in leader's log
154
155         expectFirstMatching(leaderCollectorActor, ApplyState.class);
156         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
157         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
158         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
159         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
160         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
161                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
162
163         // Verify ServerConfigurationPayload entry in both followers
164
165         expectFirstMatching(followerActor, ApplyState.class);
166         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
167         verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
168                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
169
170         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
171         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
172         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
173                 votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
174
175         // Verify new server config was applied in both followers
176
177         assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
178
179         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
180
181         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
182         expectFirstMatching(followerActor, ApplyState.class);
183
184         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
185         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
186         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
187         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
188
189         List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
190         assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
191         ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
192         assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
193         assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
194         assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
195
196         persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
197         assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
198         logEntry = persistedLogEntries.get(0);
199         assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
200         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
201         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
202                 logEntry.getData().getClass());
203     }
204
205     @Test
206     public void testAddServerWithNoExistingFollower() throws Exception {
207         RaftActorContext initialActorContext = new MockRaftActorContext();
208         initialActorContext.setCommitIndex(1);
209         initialActorContext.setLastApplied(1);
210         initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
211                 0, 2, 1).build());
212
213         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
214                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
215                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
216                 actorFactory.generateActorId(LEADER_ID));
217
218         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
219         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
220
221         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
222
223         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
224
225         // Leader should install snapshot - capture and verify ApplySnapshot contents
226
227         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
228         @SuppressWarnings("unchecked")
229         List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
230         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
231
232         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
233         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
234         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
235
236         // Verify ServerConfigurationPayload entry in leader's log
237
238         expectFirstMatching(leaderCollectorActor, ApplyState.class);
239         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
240         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
241         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
242         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
243                 votingServer(NEW_SERVER_ID));
244
245         // Verify ServerConfigurationPayload entry in the new follower
246
247         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
248         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
249         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
250                 votingServer(NEW_SERVER_ID));
251
252         // Verify new server config was applied in the new follower
253
254         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
255     }
256
257     @Test
258     public void testAddServersAsNonVoting() throws Exception {
259         RaftActorContext initialActorContext = new MockRaftActorContext();
260
261         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
262                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
263                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
264                 actorFactory.generateActorId(LEADER_ID));
265
266         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
267         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
268
269         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
270
271         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
272
273         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
274         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
275         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
276
277         // Verify ServerConfigurationPayload entry in leader's log
278
279         expectFirstMatching(leaderCollectorActor, ApplyState.class);
280
281         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
282         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
283         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
284         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
285                 nonVotingServer(NEW_SERVER_ID));
286
287         // Verify ServerConfigurationPayload entry in the new follower
288
289         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
290         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
291         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
292                 nonVotingServer(NEW_SERVER_ID));
293
294         // Verify new server config was applied in the new follower
295
296         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
297
298         assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
299
300         // Add another non-voting server.
301
302         clearMessages(leaderCollectorActor);
303
304         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
305         Follower newFollower2 = new Follower(follower2ActorContext);
306         followerActor.underlyingActor().setBehavior(newFollower2);
307
308         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
309
310         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
311         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
312         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
313
314         expectFirstMatching(leaderCollectorActor, ApplyState.class);
315         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
316         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
317         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
318         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
319                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
320     }
321
322     @Test
323     public void testAddServerWithOperationInProgress() throws Exception {
324         RaftActorContext initialActorContext = new MockRaftActorContext();
325
326         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
327                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
328                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
329                 actorFactory.generateActorId(LEADER_ID));
330
331         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
332         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
333
334         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
335
336         RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
337         Follower newFollower2 = new Follower(follower2ActorContext);
338         followerActor.underlyingActor().setBehavior(newFollower2);
339
340         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
341         newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
342
343         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
344
345         // Wait for leader's install snapshot and capture it
346
347         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
348
349         // Send a second AddServer - should get queued
350         JavaTestKit testKit2 = new JavaTestKit(getSystem());
351         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
352
353         // Continue the first AddServer
354         newFollowerRaftActorInstance.setDropMessageOfType(null);
355         newFollowerRaftActor.tell(installSnapshot, leaderActor);
356
357         // Verify both complete successfully
358         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
359         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
360
361         addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
362         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
363
364         // Verify ServerConfigurationPayload entries in leader's log
365
366         expectMatching(leaderCollectorActor, ApplyState.class, 2);
367         assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
368         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
369         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
370         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
371                 votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
372
373         // Verify ServerConfigurationPayload entry in the new follower
374
375         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
376         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
377                newFollowerActorContext.getPeerIds());
378     }
379
380     @Test
381     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
382         RaftActorContext initialActorContext = new MockRaftActorContext();
383
384         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
385                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
386                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
387                 actorFactory.generateActorId(LEADER_ID));
388
389         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
390         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
391
392         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
393
394         // Drop commit message for now to delay snapshot completion
395         leaderRaftActor.setDropMessageOfType(String.class);
396
397         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
398
399         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
400
401         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
402
403         leaderRaftActor.setDropMessageOfType(null);
404         leaderActor.tell(commitMsg, leaderActor);
405
406         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
407         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
408         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
409
410         expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
411
412         // Verify ServerConfigurationPayload entry in leader's log
413
414         expectFirstMatching(leaderCollectorActor, ApplyState.class);
415         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
416         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
417         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
418         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
419                 votingServer(NEW_SERVER_ID));
420     }
421
422     @Test
423     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
424         RaftActorContext initialActorContext = new MockRaftActorContext();
425
426         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
427                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
428                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
429                 actorFactory.generateActorId(LEADER_ID));
430
431         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
432         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
433
434         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
435
436         // Drop commit message so the snapshot doesn't complete.
437         leaderRaftActor.setDropMessageOfType(String.class);
438
439         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
440
441         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
442
443         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
444         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
445
446         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
447     }
448
449     @Test
450     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
451         RaftActorContext initialActorContext = new MockRaftActorContext();
452
453         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
454                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
455                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
456                 actorFactory.generateActorId(LEADER_ID));
457
458         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
459         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
460         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
461
462         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
463
464         // Drop the commit message so the snapshot doesn't complete yet.
465         leaderRaftActor.setDropMessageOfType(String.class);
466
467         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
468
469         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
470
471         String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
472
473         // Change the leader behavior to follower
474         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
475
476         // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
477         // snapshot completes. This will prevent the invalid snapshot from completing and fail the
478         // isCapturing assertion below.
479         leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
480
481         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
482         leaderActor.tell(commitMsg, leaderActor);
483
484         leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
485
486         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
487         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
488
489         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
490         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
491     }
492
493     @Test
494     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
495         RaftActorContext initialActorContext = new MockRaftActorContext();
496
497         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
498                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
499                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
500                 actorFactory.generateActorId(LEADER_ID));
501
502         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
503         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
504
505         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
506
507         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
508
509         // Drop the UnInitializedFollowerSnapshotReply to delay it.
510         leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
511
512         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
513
514         UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
515                 UnInitializedFollowerSnapshotReply.class);
516
517         // Prevent election timeout when the leader switches to follower
518         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
519
520         // Change the leader behavior to follower
521         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
522
523         // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
524         leaderRaftActor.setDropMessageOfType(null);
525         leaderActor.tell(snapshotReply, leaderActor);
526
527         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
528         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
529
530         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
531     }
532
533     @Test
534     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
535         RaftActorContext initialActorContext = new MockRaftActorContext();
536
537         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
538                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
539                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
540                 actorFactory.generateActorId(LEADER_ID));
541
542         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
543         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
544         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
545
546         // Drop the InstallSnapshot message so it times out
547         newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
548
549         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
550
551         leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
552
553         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
554         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
555
556         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
557         assertEquals("Leader followers size", 0,
558                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
559     }
560
561     @Test
562     public void testAddServerWithNoLeader() {
563         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
564         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
565
566         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
567                 MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
568                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
569                 actorFactory.generateActorId(LEADER_ID));
570         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
571
572         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
573         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
574         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
575     }
576
577     @Test
578     public void testAddServerWithNoConsensusReached() {
579         RaftActorContext initialActorContext = new MockRaftActorContext();
580
581         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
582                 MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
583                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
584                 actorFactory.generateActorId(LEADER_ID));
585
586         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
587         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
588
589         newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
590
591         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
592
593         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
594         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
595         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
596
597         // Verify ServerConfigurationPayload entry in leader's log
598
599         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
600                 votingServer(NEW_SERVER_ID));
601     }
602
603     @Test
604     public void testAddServerWithExistingServer() {
605         RaftActorContext initialActorContext = new MockRaftActorContext();
606
607         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
608                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
609                         initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
610                 actorFactory.generateActorId(LEADER_ID));
611
612         leaderActor.tell(new AddServer(FOLLOWER_ID, followerActor.path().toString(), true), testKit.getRef());
613
614         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
615         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
616     }
617
618     @Test
619     public void testAddServerForwardedToLeader() {
620         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
621         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
622
623         TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
624                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
625                 actorFactory.generateActorId(LEADER_ID));
626
627         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
628                 MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
629                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
630                 actorFactory.generateActorId(FOLLOWER_ID));
631         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
632
633         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
634                 -1, -1, (short)0), leaderActor);
635
636         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
637         expectFirstMatching(leaderActor, AddServer.class);
638     }
639
640     @Test
641     public void testOnApplyState() {
642         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
643
644         ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
645                 new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
646         boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
647         assertEquals("Message handled", true, handled);
648
649         ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
650                 new MockRaftActorContext.MockPayload("1"));
651         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
652         assertEquals("Message handled", false, handled);
653     }
654
655     private ServerInfo votingServer(String id) {
656         return new ServerInfo(id, true);
657     }
658
659     private ServerInfo nonVotingServer(String id) {
660         return new ServerInfo(id, false);
661     }
662
663     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
664         TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
665                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
666                 actorFactory.generateActorId(LEADER_ID + "Collector"));
667         leaderRaftActor.setCollectorActor(leaderCollectorActor);
668         return leaderCollectorActor;
669     }
670
671     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
672         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
673         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
674         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
675         assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
676     }
677
678     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
679         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
680         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
681         configParams.setElectionTimeoutFactor(100000);
682         ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
683         termInfo.update(1, LEADER_ID);
684         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
685                 id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
686     }
687
688     static abstract class AbstractMockRaftActor extends MockRaftActor {
689         private volatile TestActorRef<MessageCollectorActor> collectorActor;
690         private volatile Class<?> dropMessageOfType;
691
692         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
693                 DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
694             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
695                     dataPersistenceProvider(dataPersistenceProvider));
696             this.collectorActor = collectorActor;
697         }
698
699         void setDropMessageOfType(Class<?> dropMessageOfType) {
700             this.dropMessageOfType = dropMessageOfType;
701         }
702
703         void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
704             this.collectorActor = collectorActor;
705         }
706
707         @Override
708         public void handleCommand(Object message) {
709             if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
710                 super.handleCommand(message);
711             }
712
713             if(collectorActor != null) {
714                 collectorActor.tell(message, getSender());
715             }
716         }
717     }
718
719     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
720         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
721                 RaftActorContext fromContext) {
722             super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
723             setPersistence(false);
724
725             RaftActorContext context = getRaftActorContext();
726             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
727                 ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
728                 getState().add(entry.getData());
729                 context.getReplicatedLog().append(entry);
730             }
731
732             context.setCommitIndex(fromContext.getCommitIndex());
733             context.setLastApplied(fromContext.getLastApplied());
734             context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
735                     fromContext.getTermInformation().getVotedFor());
736         }
737
738         @Override
739         protected void initializeBehavior() {
740             changeCurrentBehavior(new Leader(getRaftActorContext()));
741             initializeBehaviorComplete.countDown();
742         }
743
744         @Override
745         public void createSnapshot(ActorRef actorRef) {
746             try {
747                 actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
748             } catch (Exception e) {
749                 LOG.error("createSnapshot failed", e);
750             }
751         }
752
753         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
754             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
755             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
756             configParams.setElectionTimeoutFactor(10);
757             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
758         }
759     }
760
761     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
762         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
763             super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
764             setPersistence(false);
765         }
766
767         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
768             return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
769         }
770     }
771 }