c47ca5cec1f93273a93c744125c12223d0fe39dd
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNotSame;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Matchers.anyObject;
20 import static org.mockito.Matchers.eq;
21 import static org.mockito.Matchers.same;
22 import static org.mockito.Mockito.doNothing;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.reset;
27 import static org.mockito.Mockito.timeout;
28 import static org.mockito.Mockito.verify;
29 import akka.actor.ActorRef;
30 import akka.actor.PoisonPill;
31 import akka.actor.Props;
32 import akka.actor.Status.Failure;
33 import akka.actor.Terminated;
34 import akka.dispatch.Dispatchers;
35 import akka.japi.Procedure;
36 import akka.persistence.SaveSnapshotFailure;
37 import akka.persistence.SaveSnapshotSuccess;
38 import akka.persistence.SnapshotMetadata;
39 import akka.persistence.SnapshotOffer;
40 import akka.persistence.SnapshotSelectionCriteria;
41 import akka.testkit.JavaTestKit;
42 import akka.testkit.TestActorRef;
43 import com.google.common.base.Optional;
44 import com.google.common.collect.ImmutableMap;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.google.protobuf.ByteString;
47 import java.io.ByteArrayOutputStream;
48 import java.io.ObjectOutputStream;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.TimeoutException;
57 import org.apache.commons.lang3.SerializationUtils;
58 import org.junit.After;
59 import org.junit.Before;
60 import org.junit.Test;
61 import org.mockito.ArgumentCaptor;
62 import org.opendaylight.controller.cluster.DataPersistenceProvider;
63 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
64 import org.opendaylight.controller.cluster.PersistentDataProvider;
65 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
66 import org.opendaylight.controller.cluster.notifications.RoleChanged;
67 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
68 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
73 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
75 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
76 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
77 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
78 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
79 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
80 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
81 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
82 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
83 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
85 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
86 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
87 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
88 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
89 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.duration.Duration;
94 import scala.concurrent.duration.FiniteDuration;
95
96 public class RaftActorTest extends AbstractActorTest {
97
98     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
99
100     private TestActorFactory factory;
101
102     @Before
103     public void setUp(){
104         factory = new TestActorFactory(getSystem());
105     }
106
107     @SuppressWarnings("unchecked")
108     private static DataPersistenceProvider mockPersistenceProvider() {
109         final DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
110         doReturn(false).when(dataPersistenceProvider).isRecoveryApplicable();
111         doReturn(0L).when(dataPersistenceProvider).getLastSequenceNumber();
112         doNothing().when(dataPersistenceProvider).saveSnapshot(any(Object.class));
113         doNothing().when(dataPersistenceProvider).persist(any(Object.class), any(Procedure.class));
114         doNothing().when(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
115         doNothing().when(dataPersistenceProvider).deleteMessages(0L);
116
117         return dataPersistenceProvider;
118     }
119
120     @After
121     public void tearDown() throws Exception {
122         factory.close();
123         InMemoryJournal.clear();
124         InMemorySnapshotStore.clear();
125     }
126
127     @Test
128     public void testConstruction() {
129         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
130     }
131
132     @Test
133     public void testFindLeaderWhenLeaderIsSelf(){
134         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
135         kit.waitUntilLeader();
136     }
137
138
139     @Test
140     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
141         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
142
143         new JavaTestKit(getSystem()) {{
144             String persistenceId = factory.generateActorId("follower-");
145
146             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
147
148             // Set the heartbeat interval high to essentially disable election otherwise the test
149             // may fail if the actor is switched to Leader and the commitIndex is set to the last
150             // log entry.
151             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
152
153             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
154             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
155                     peerAddresses, config), persistenceId);
156
157             watch(followerActor);
158
159             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
160             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
161                     new MockRaftActorContext.MockPayload("E"));
162             snapshotUnappliedEntries.add(entry1);
163
164             int lastAppliedDuringSnapshotCapture = 3;
165             int lastIndexDuringSnapshotCapture = 4;
166
167             // 4 messages as part of snapshot, which are applied to state
168             ByteString snapshotBytes = fromObject(Arrays.asList(
169                     new MockRaftActorContext.MockPayload("A"),
170                     new MockRaftActorContext.MockPayload("B"),
171                     new MockRaftActorContext.MockPayload("C"),
172                     new MockRaftActorContext.MockPayload("D")));
173
174             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
175                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
176                     lastAppliedDuringSnapshotCapture, 1);
177             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
178
179             // add more entries after snapshot is taken
180             List<ReplicatedLogEntry> entries = new ArrayList<>();
181             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
182                     new MockRaftActorContext.MockPayload("F", 2));
183             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
184                     new MockRaftActorContext.MockPayload("G", 3));
185             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
186                     new MockRaftActorContext.MockPayload("H", 4));
187             entries.add(entry2);
188             entries.add(entry3);
189             entries.add(entry4);
190
191             int lastAppliedToState = 5;
192             int lastIndex = 7;
193
194             InMemoryJournal.addEntry(persistenceId, 5, entry2);
195             // 2 entries are applied to state besides the 4 entries in snapshot
196             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
197             InMemoryJournal.addEntry(persistenceId, 7, entry3);
198             InMemoryJournal.addEntry(persistenceId, 8, entry4);
199
200             // kill the actor
201             followerActor.tell(PoisonPill.getInstance(), null);
202             expectMsgClass(duration("5 seconds"), Terminated.class);
203
204             unwatch(followerActor);
205
206             //reinstate the actor
207             TestActorRef<MockRaftActor> ref = factory.createTestActor(
208                     MockRaftActor.props(persistenceId, peerAddresses, config));
209
210             MockRaftActor mockRaftActor = ref.underlyingActor();
211
212             mockRaftActor.waitForRecoveryComplete();
213
214             RaftActorContext context = mockRaftActor.getRaftActorContext();
215             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
216                     context.getReplicatedLog().size());
217             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
218             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
219             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
220             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
221             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
222
223             mockRaftActor.waitForInitializeBehaviorComplete();
224
225             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
226         }};
227
228         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
229     }
230
231     @Test
232     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
233         new JavaTestKit(getSystem()) {{
234             String persistenceId = factory.generateActorId("follower-");
235
236             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
237
238             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
239
240             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
241                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
242                     config, new NonPersistentDataProvider()), persistenceId);
243
244             MockRaftActor mockRaftActor = ref.underlyingActor();
245
246             mockRaftActor.waitForRecoveryComplete();
247
248             mockRaftActor.waitForInitializeBehaviorComplete();
249
250             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
251         }};
252     }
253
254     @Test
255     public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
256         new JavaTestKit(getSystem()) {{
257             String persistenceId = factory.generateActorId("follower-");
258             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
259             config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
260             config.setElectionTimeoutFactor(1);
261
262             InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
263
264             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
265                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
266                     config, new NonPersistentDataProvider()).
267                             withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
268
269             InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
270             List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
271             assertEquals("UpdateElectionTerm entries", 1, entries.size());
272             UpdateElectionTerm updateEntry = entries.get(0);
273
274             factory.killActor(ref, this);
275
276             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
277             ref = factory.createTestActor(MockRaftActor.props(persistenceId,
278                     ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
279                     new NonPersistentDataProvider()).
280                             withDispatcher(Dispatchers.DefaultDispatcherId()),
281                             factory.generateActorId("follower-"));
282
283             MockRaftActor actor = ref.underlyingActor();
284             actor.waitForRecoveryComplete();
285
286             RaftActorContext newContext = actor.getRaftActorContext();
287             assertEquals("electionTerm", updateEntry.getCurrentTerm(),
288                     newContext.getTermInformation().getCurrentTerm());
289             assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
290
291             entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
292             assertEquals("UpdateElectionTerm entries", 1, entries.size());
293         }};
294     }
295
296     @Test
297     public void testRaftActorForwardsToRaftActorRecoverySupport() {
298         String persistenceId = factory.generateActorId("leader-");
299
300         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
301
302         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
303
304         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
305                 Collections.<String, String>emptyMap(), config), persistenceId);
306
307         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
308
309         // Wait for akka's recovery to complete so it doesn't interfere.
310         mockRaftActor.waitForRecoveryComplete();
311
312         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
313         doReturn(false).when(mockSupport).handleRecoveryMessage(any(Object.class), any(PersistentDataProvider.class));
314         mockRaftActor.setRaftActorRecoverySupport(mockSupport);
315
316         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
317         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
318         mockRaftActor.handleRecover(snapshotOffer);
319
320         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
321                 1, new MockRaftActorContext.MockPayload("1", 5));
322         mockRaftActor.handleRecover(logEntry);
323
324         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
325         mockRaftActor.handleRecover(applyJournalEntries);
326
327         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
328         mockRaftActor.handleRecover(applyLogEntries);
329
330         DeleteEntries deleteEntries = new DeleteEntries(1);
331         mockRaftActor.handleRecover(deleteEntries);
332
333         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
334                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
335         mockRaftActor.handleRecover(deprecatedDeleteEntries);
336
337         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
338         mockRaftActor.handleRecover(updateElectionTerm);
339
340         org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
341                 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
342         mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
343
344         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
345         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
346         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
347         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
348         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
349         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
350         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
351         verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
352     }
353
354     @Test
355     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
356         String persistenceId = factory.generateActorId("leader-");
357
358         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
359
360         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
361
362         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
363
364         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
365                 config(config).snapshotMessageSupport(mockSupport).props());
366
367         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
368
369         // Wait for akka's recovery to complete so it doesn't interfere.
370         mockRaftActor.waitForRecoveryComplete();
371
372         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
373         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
374         mockRaftActor.handleCommand(applySnapshot);
375
376         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
377         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
378         mockRaftActor.handleCommand(captureSnapshot);
379
380         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
381         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
382         mockRaftActor.handleCommand(captureSnapshotReply);
383
384         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
385         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
386         mockRaftActor.handleCommand(saveSnapshotSuccess);
387
388         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
389         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
390         mockRaftActor.handleCommand(saveSnapshotFailure);
391
392         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
393                 any(ActorRef.class));
394         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
395
396         doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
397         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
398
399         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
400         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
401         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
402         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
403         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
404         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
405                 any(ActorRef.class));
406         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
407     }
408
409     @Test
410     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
411         new JavaTestKit(getSystem()) {
412             {
413                 String persistenceId = factory.generateActorId("leader-");
414
415                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
416
417                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
418
419                 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
420
421                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
422                         Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
423
424                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
425
426                 mockRaftActor.waitForInitializeBehaviorComplete();
427
428                 mockRaftActor.waitUntilLeader();
429
430                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
431
432                 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
433
434             }
435
436         };
437     }
438
439     @Test
440     public void testApplyState() throws Exception {
441
442         new JavaTestKit(getSystem()) {
443             {
444                 String persistenceId = factory.generateActorId("leader-");
445
446                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
447
448                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
449
450                 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
451
452                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
453                         Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
454
455                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
456
457                 mockRaftActor.waitForInitializeBehaviorComplete();
458
459                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
460                         new MockRaftActorContext.MockPayload("F"));
461
462                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
463
464                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
465
466             }
467         };
468     }
469
470     @Test
471     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
472         new JavaTestKit(getSystem()) {{
473             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
474                     Props.create(MessageCollectorActor.class));
475             MessageCollectorActor.waitUntilReady(notifierActor);
476
477             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
478             long heartBeatInterval = 100;
479             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
480             config.setElectionTimeoutFactor(20);
481
482             String persistenceId = factory.generateActorId("notifier-");
483
484             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
485                     config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
486                             new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
487                             persistenceId);
488
489             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
490
491
492             // check if the notifier got a role change from null to Follower
493             RoleChanged raftRoleChanged = matches.get(0);
494             assertEquals(persistenceId, raftRoleChanged.getMemberId());
495             assertNull(raftRoleChanged.getOldRole());
496             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
497
498             // check if the notifier got a role change from Follower to Candidate
499             raftRoleChanged = matches.get(1);
500             assertEquals(persistenceId, raftRoleChanged.getMemberId());
501             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
502             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
503
504             // check if the notifier got a role change from Candidate to Leader
505             raftRoleChanged = matches.get(2);
506             assertEquals(persistenceId, raftRoleChanged.getMemberId());
507             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
508             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
509
510             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
511                     notifierActor, LeaderStateChanged.class);
512
513             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
514             assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
515
516             notifierActor.underlyingActor().clear();
517
518             MockRaftActor raftActor = raftActorRef.underlyingActor();
519             final String newLeaderId = "new-leader";
520             final short newLeaderVersion = 6;
521             Follower follower = new Follower(raftActor.getRaftActorContext()) {
522                 @Override
523                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
524                     leaderId = newLeaderId;
525                     setLeaderPayloadVersion(newLeaderVersion);
526                     return this;
527                 }
528             };
529
530             raftActor.newBehavior(follower);
531
532             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
533             assertEquals(persistenceId, leaderStateChange.getMemberId());
534             assertEquals(null, leaderStateChange.getLeaderId());
535
536             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
537             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
538             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
539
540             notifierActor.underlyingActor().clear();
541
542             raftActor.handleCommand("any");
543
544             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
545             assertEquals(persistenceId, leaderStateChange.getMemberId());
546             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
547             assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
548
549             notifierActor.underlyingActor().clear();
550
551             raftActor.handleCommand("any");
552
553             Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
554             leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
555             assertNull(leaderStateChange);
556         }};
557     }
558
559     @Test
560     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
561         new JavaTestKit(getSystem()) {{
562             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
563             MessageCollectorActor.waitUntilReady(notifierActor);
564
565             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
566             long heartBeatInterval = 100;
567             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
568             config.setElectionTimeoutFactor(1);
569
570             String persistenceId = factory.generateActorId("notifier-");
571
572             factory.createActor(MockRaftActor.builder().id(persistenceId).
573                     peerAddresses(ImmutableMap.of("leader", "fake/path")).
574                     config(config).roleChangeNotifier(notifierActor).props());
575
576             List<RoleChanged> matches =  null;
577             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
578                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
579                 assertNotNull(matches);
580                 if(matches.size() == 3) {
581                     break;
582                 }
583                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
584             }
585
586             assertNotNull(matches);
587             assertEquals(2, matches.size());
588
589             // check if the notifier got a role change from null to Follower
590             RoleChanged raftRoleChanged = matches.get(0);
591             assertEquals(persistenceId, raftRoleChanged.getMemberId());
592             assertNull(raftRoleChanged.getOldRole());
593             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
594
595             // check if the notifier got a role change from Follower to Candidate
596             raftRoleChanged = matches.get(1);
597             assertEquals(persistenceId, raftRoleChanged.getMemberId());
598             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
599             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
600
601         }};
602     }
603
604     @Test
605     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
606         new JavaTestKit(getSystem()) {
607             {
608                 String persistenceId = factory.generateActorId("leader-");
609                 String follower1Id = factory.generateActorId("follower-");
610
611                 ActorRef followerActor1 =
612                         factory.createActor(Props.create(MessageCollectorActor.class));
613
614                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
615                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
616                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
617
618                 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
619
620                 Map<String, String> peerAddresses = new HashMap<>();
621                 peerAddresses.put(follower1Id, followerActor1.path().toString());
622
623                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
624                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
625
626                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
627
628                 leaderActor.getRaftActorContext().setCommitIndex(4);
629                 leaderActor.getRaftActorContext().setLastApplied(4);
630                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
631
632                 leaderActor.waitForInitializeBehaviorComplete();
633
634                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
635
636                 Leader leader = new Leader(leaderActor.getRaftActorContext());
637                 leaderActor.setCurrentBehavior(leader);
638                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
639
640                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
641                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
642
643                 assertEquals(8, leaderActor.getReplicatedLog().size());
644
645                 leaderActor.getRaftActorContext().getSnapshotManager()
646                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
647                                 new MockRaftActorContext.MockPayload("x")), 4);
648
649                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
650
651                 assertEquals(8, leaderActor.getReplicatedLog().size());
652
653                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
654                 //fake snapshot on index 5
655                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
656
657                 assertEquals(8, leaderActor.getReplicatedLog().size());
658
659                 //fake snapshot on index 6
660                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
661                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
662                 assertEquals(8, leaderActor.getReplicatedLog().size());
663
664                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
665
666                 assertEquals(8, leaderActor.getReplicatedLog().size());
667
668                 ByteString snapshotBytes = fromObject(Arrays.asList(
669                         new MockRaftActorContext.MockPayload("foo-0"),
670                         new MockRaftActorContext.MockPayload("foo-1"),
671                         new MockRaftActorContext.MockPayload("foo-2"),
672                         new MockRaftActorContext.MockPayload("foo-3"),
673                         new MockRaftActorContext.MockPayload("foo-4")));
674
675                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
676                         leader, Runtime.getRuntime().totalMemory());
677
678                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
679
680                 // The commit is needed to complete the snapshot creation process
681                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
682
683                 // capture snapshot reply should remove the snapshotted entries only
684                 assertEquals(3, leaderActor.getReplicatedLog().size());
685                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
686
687                 // add another non-replicated entry
688                 leaderActor.getReplicatedLog().append(
689                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
690
691                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
692                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
693                 assertEquals(2, leaderActor.getReplicatedLog().size());
694                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
695
696             }
697         };
698     }
699
700     @Test
701     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
702         new JavaTestKit(getSystem()) {
703             {
704                 String persistenceId = factory.generateActorId("follower-");
705                 String leaderId = factory.generateActorId("leader-");
706
707
708                 ActorRef leaderActor1 =
709                         factory.createActor(Props.create(MessageCollectorActor.class));
710
711                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
712                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
713                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
714
715                 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
716
717                 Map<String, String> peerAddresses = new HashMap<>();
718                 peerAddresses.put(leaderId, leaderActor1.path().toString());
719
720                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
721                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
722
723                 MockRaftActor followerActor = mockActorRef.underlyingActor();
724                 followerActor.getRaftActorContext().setCommitIndex(4);
725                 followerActor.getRaftActorContext().setLastApplied(4);
726                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
727
728                 followerActor.waitForInitializeBehaviorComplete();
729
730
731                 Follower follower = new Follower(followerActor.getRaftActorContext());
732                 followerActor.setCurrentBehavior(follower);
733                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
734
735                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
736                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
737                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
738
739                 // log has indices 0-5
740                 assertEquals(6, followerActor.getReplicatedLog().size());
741
742                 //snapshot on 4
743                 followerActor.getRaftActorContext().getSnapshotManager().capture(
744                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
745                                 new MockRaftActorContext.MockPayload("D")), 4);
746
747                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
748
749                 assertEquals(6, followerActor.getReplicatedLog().size());
750
751                 //fake snapshot on index 6
752                 List<ReplicatedLogEntry> entries =
753                         Arrays.asList(
754                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
755                                         new MockRaftActorContext.MockPayload("foo-6"))
756                         );
757                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
758                 assertEquals(7, followerActor.getReplicatedLog().size());
759
760                 //fake snapshot on index 7
761                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
762
763                 entries =
764                         Arrays.asList(
765                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
766                                         new MockRaftActorContext.MockPayload("foo-7"))
767                         );
768                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
769                 assertEquals(8, followerActor.getReplicatedLog().size());
770
771                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
772
773
774                 ByteString snapshotBytes = fromObject(Arrays.asList(
775                         new MockRaftActorContext.MockPayload("foo-0"),
776                         new MockRaftActorContext.MockPayload("foo-1"),
777                         new MockRaftActorContext.MockPayload("foo-2"),
778                         new MockRaftActorContext.MockPayload("foo-3"),
779                         new MockRaftActorContext.MockPayload("foo-4")));
780                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
781                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
782
783                 // The commit is needed to complete the snapshot creation process
784                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
785
786                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
787                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
788                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
789
790                 entries =
791                         Arrays.asList(
792                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
793                                         new MockRaftActorContext.MockPayload("foo-7"))
794                         );
795                 // send an additional entry 8 with leaderCommit = 7
796                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
797
798                 // 7 and 8, as lastapplied is 7
799                 assertEquals(2, followerActor.getReplicatedLog().size());
800
801             }
802         };
803     }
804
805     @Test
806     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
807         new JavaTestKit(getSystem()) {
808             {
809                 String persistenceId = factory.generateActorId("leader-");
810                 String follower1Id = factory.generateActorId("follower-");
811                 String follower2Id = factory.generateActorId("follower-");
812
813                 ActorRef followerActor1 =
814                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
815                 ActorRef followerActor2 =
816                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
817
818                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
819                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
820                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
821
822                 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
823
824                 Map<String, String> peerAddresses = new HashMap<>();
825                 peerAddresses.put(follower1Id, followerActor1.path().toString());
826                 peerAddresses.put(follower2Id, followerActor2.path().toString());
827
828                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
829                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
830
831                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
832                 leaderActor.getRaftActorContext().setCommitIndex(9);
833                 leaderActor.getRaftActorContext().setLastApplied(9);
834                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
835
836                 leaderActor.waitForInitializeBehaviorComplete();
837
838                 Leader leader = new Leader(leaderActor.getRaftActorContext());
839                 leaderActor.setCurrentBehavior(leader);
840                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
841
842                 // create 5 entries in the log
843                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
844                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
845
846                 //set the snapshot index to 4 , 0 to 4 are snapshotted
847                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
848                 //setting replicatedToAllIndex = 9, for the log to clear
849                 leader.setReplicatedToAllIndex(9);
850                 assertEquals(5, leaderActor.getReplicatedLog().size());
851                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
852
853                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
854                 assertEquals(5, leaderActor.getReplicatedLog().size());
855                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
856
857                 // set the 2nd follower nextIndex to 1 which has been snapshotted
858                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
859                 assertEquals(5, leaderActor.getReplicatedLog().size());
860                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
861
862                 // simulate a real snapshot
863                 leaderActor.onReceiveCommand(new SendHeartBeat());
864                 assertEquals(5, leaderActor.getReplicatedLog().size());
865                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
866                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
867                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
868
869
870                 //reply from a slow follower does not initiate a fake snapshot
871                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
872                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
873
874                 ByteString snapshotBytes = fromObject(Arrays.asList(
875                         new MockRaftActorContext.MockPayload("foo-0"),
876                         new MockRaftActorContext.MockPayload("foo-1"),
877                         new MockRaftActorContext.MockPayload("foo-2"),
878                         new MockRaftActorContext.MockPayload("foo-3"),
879                         new MockRaftActorContext.MockPayload("foo-4")));
880                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
881                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
882
883                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
884
885                 //reply from a slow follower after should not raise errors
886                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
887                 assertEquals(0, leaderActor.getReplicatedLog().size());
888             }
889         };
890     }
891
892     @Test
893     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
894         new JavaTestKit(getSystem()) {{
895             String persistenceId = factory.generateActorId("leader-");
896             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
897             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
898             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
899             config.setSnapshotBatchCount(5);
900
901             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
902
903             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
904
905             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
906                     MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
907
908             MockRaftActor leaderActor = mockActorRef.underlyingActor();
909             leaderActor.getRaftActorContext().setCommitIndex(3);
910             leaderActor.getRaftActorContext().setLastApplied(3);
911             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
912
913             leaderActor.waitForInitializeBehaviorComplete();
914             for(int i=0;i< 4;i++) {
915                 leaderActor.getReplicatedLog()
916                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
917                                 new MockRaftActorContext.MockPayload("A")));
918             }
919
920             Leader leader = new Leader(leaderActor.getRaftActorContext());
921             leaderActor.setCurrentBehavior(leader);
922             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
923
924             // Simulate an install snaphost to a follower.
925             leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
926                     leaderActor.getReplicatedLog().last(), -1, "member1");
927
928             // Now send a CaptureSnapshotReply
929             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
930
931             // Trimming log in this scenario is a no-op
932             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
933             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
934             assertEquals(-1, leader.getReplicatedToAllIndex());
935
936         }};
937     }
938
939     @Test
940     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
941         new JavaTestKit(getSystem()) {{
942             String persistenceId = factory.generateActorId("leader-");
943             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
946             config.setSnapshotBatchCount(5);
947
948             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
949
950             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
951
952             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
953                     MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
954
955             MockRaftActor leaderActor = mockActorRef.underlyingActor();
956             leaderActor.getRaftActorContext().setCommitIndex(3);
957             leaderActor.getRaftActorContext().setLastApplied(3);
958             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
959             leaderActor.getReplicatedLog().setSnapshotIndex(3);
960
961             leaderActor.waitForInitializeBehaviorComplete();
962             Leader leader = new Leader(leaderActor.getRaftActorContext());
963             leaderActor.setCurrentBehavior(leader);
964             leader.setReplicatedToAllIndex(3);
965             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
966
967             // Persist another entry (this will cause a CaptureSnapshot to be triggered
968             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
969
970             // Now send a CaptureSnapshotReply
971             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
972
973             // Trimming log in this scenario is a no-op
974             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
975             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
976             assertEquals(3, leader.getReplicatedToAllIndex());
977
978         }};
979     }
980
981     @Test
982     public void testRaftActorOnRecoverySnapshot() throws Exception {
983         TEST_LOG.info("testRaftActorOnRecoverySnapshot");
984
985         new JavaTestKit(getSystem()) {{
986                 String persistenceId = factory.generateActorId("follower-");
987
988                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
989
990                 // Set the heartbeat interval high to essentially disable election otherwise the test
991                 // may fail if the actor is switched to Leader
992                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
993
994                 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
995
996                 // Create mock ReplicatedLogEntry
997                 ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
998                         new MockRaftActorContext.MockPayload("F", 1));
999
1000                 InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
1001
1002                 TestActorRef<MockRaftActor> ref = factory.createTestActor(
1003                         MockRaftActor.props(persistenceId, peerAddresses, config));
1004
1005                 MockRaftActor mockRaftActor = ref.underlyingActor();
1006
1007                 mockRaftActor.waitForRecoveryComplete();
1008
1009                 mockRaftActor.waitForInitializeBehaviorComplete();
1010
1011                 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1012             }};
1013     }
1014
1015     @Test
1016     public void testSwitchBehavior(){
1017         String persistenceId = factory.generateActorId("leader-");
1018         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1019         config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1020         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1021         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1022         config.setSnapshotBatchCount(5);
1023
1024         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1025
1026         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
1027
1028         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1029                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
1030
1031         MockRaftActor leaderActor = mockActorRef.underlyingActor();
1032
1033         leaderActor.waitForRecoveryComplete();
1034
1035         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
1036
1037         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1038         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
1039
1040         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
1041
1042         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1043         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1044
1045         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
1046
1047         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1048         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1049
1050         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
1051
1052         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1053         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1054     }
1055
1056     public static ByteString fromObject(Object snapshot) throws Exception {
1057         ByteArrayOutputStream b = null;
1058         ObjectOutputStream o = null;
1059         try {
1060             b = new ByteArrayOutputStream();
1061             o = new ObjectOutputStream(b);
1062             o.writeObject(snapshot);
1063             byte[] snapshotBytes = b.toByteArray();
1064             return ByteString.copyFrom(snapshotBytes);
1065         } finally {
1066             if (o != null) {
1067                 o.flush();
1068                 o.close();
1069             }
1070             if (b != null) {
1071                 b.close();
1072             }
1073         }
1074     }
1075
1076     @Test
1077     public void testUpdateConfigParam() throws Exception {
1078         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
1079         String persistenceId = factory.generateActorId("follower-");
1080         ImmutableMap<String, String> peerAddresses =
1081             ImmutableMap.<String, String>builder().put("member1", "address").build();
1082         DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
1083
1084         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
1085                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
1086         MockRaftActor mockRaftActor = actorRef.underlyingActor();
1087         mockRaftActor.waitForInitializeBehaviorComplete();
1088
1089         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
1090         mockRaftActor.updateConfigParams(emptyConfig);
1091         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1092         assertEquals("Behavior State", RaftState.Follower,
1093             mockRaftActor.getCurrentBehavior().state());
1094
1095         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
1096         disableConfig.setCustomRaftPolicyImplementationClass(
1097             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1098         mockRaftActor.updateConfigParams(disableConfig);
1099         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1100         assertEquals("Behavior State", RaftState.Follower,
1101             mockRaftActor.getCurrentBehavior().state());
1102
1103         behavior = mockRaftActor.getCurrentBehavior();
1104         mockRaftActor.updateConfigParams(disableConfig);
1105         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1106         assertEquals("Behavior State", RaftState.Follower,
1107             mockRaftActor.getCurrentBehavior().state());
1108
1109         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
1110         defaultConfig.setCustomRaftPolicyImplementationClass(
1111             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
1112         mockRaftActor.updateConfigParams(defaultConfig);
1113         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1114         assertEquals("Behavior State", RaftState.Follower,
1115             mockRaftActor.getCurrentBehavior().state());
1116
1117         behavior = mockRaftActor.getCurrentBehavior();
1118         mockRaftActor.updateConfigParams(defaultConfig);
1119         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1120         assertEquals("Behavior State", RaftState.Follower,
1121             mockRaftActor.getCurrentBehavior().state());
1122     }
1123
1124     @Test
1125     public void testGetSnapshot() throws Exception {
1126         TEST_LOG.info("testGetSnapshot starting");
1127
1128         JavaTestKit kit = new JavaTestKit(getSystem());
1129
1130         String persistenceId = factory.generateActorId("test-actor-");
1131         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1132         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1133
1134         long term = 3;
1135         long seqN = 1;
1136         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1137         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
1138                 new MockRaftActorContext.MockPayload("A")));
1139         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
1140                 new MockRaftActorContext.MockPayload("B")));
1141         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1142         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
1143                 new MockRaftActorContext.MockPayload("C")));
1144
1145         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1146                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
1147                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1148         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1149
1150         mockRaftActor.waitForRecoveryComplete();
1151
1152         // Wait for snapshot after recovery
1153         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1154
1155         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1156         doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1157
1158         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1159
1160         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1161         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
1162
1163         byte[] stateSnapshot = new byte[]{1,2,3};
1164         replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
1165
1166         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1167
1168         assertEquals("getId", persistenceId, reply.getId());
1169         Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1170         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1171         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1172         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1173         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1174         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1175         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1176         assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
1177         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1178         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1179
1180         // Test with timeout
1181
1182         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
1183         reset(mockRaftActor.snapshotCohortDelegate);
1184         doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1185
1186         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1187         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1188         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1189
1190         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1191
1192         // Test with persistence disabled.
1193
1194         mockRaftActor.setPersistence(false);
1195         reset(mockRaftActor.snapshotCohortDelegate);
1196         doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1197
1198         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1199         reply = kit.expectMsgClass(GetSnapshotReply.class);
1200         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
1201
1202         assertEquals("getId", persistenceId, reply.getId());
1203         replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1204         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1205         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1206         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1207         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1208         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1209         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1210         assertEquals("getState length", 0, replySnapshot.getState().length);
1211         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1212
1213         TEST_LOG.info("testGetSnapshot ending");
1214     }
1215
1216     @Test
1217     public void testRestoreFromSnapshot() throws Exception {
1218         TEST_LOG.info("testRestoreFromSnapshot starting");
1219
1220         String persistenceId = factory.generateActorId("test-actor-");
1221         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1222         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1223
1224         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
1225         snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
1226                 new MockRaftActorContext.MockPayload("E")));
1227
1228         int snapshotLastApplied = 3;
1229         int snapshotLastIndex = 4;
1230
1231         List<MockPayload> state = Arrays.asList(
1232                 new MockRaftActorContext.MockPayload("A"),
1233                 new MockRaftActorContext.MockPayload("B"),
1234                 new MockRaftActorContext.MockPayload("C"),
1235                 new MockRaftActorContext.MockPayload("D"));
1236         ByteString stateBytes = fromObject(state);
1237
1238         Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
1239                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
1240
1241         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1242
1243         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1244                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1245                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1246         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1247
1248         mockRaftActor.waitForRecoveryComplete();
1249
1250         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1251         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1252         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1253         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1254         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1255         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1256         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1257         assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
1258         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1259
1260         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
1261
1262         RaftActorContext context = mockRaftActor.getRaftActorContext();
1263         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1264         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1265         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1266         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1267         assertEquals("Recovered state", state, mockRaftActor.getState());
1268         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1269         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1270
1271         // Test with data persistence disabled
1272
1273         snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
1274                 -1, -1, -1, -1, 5, "member-1");
1275
1276         persistenceId = factory.generateActorId("test-actor-");
1277
1278         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1279                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
1280                 persistent(Optional.of(Boolean.FALSE)).props().
1281                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1282         mockRaftActor = raftActorRef.underlyingActor();
1283
1284         mockRaftActor.waitForRecoveryComplete();
1285         assertEquals("snapshot committed", true,
1286                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1287
1288         context = mockRaftActor.getRaftActorContext();
1289         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1290         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1291
1292         TEST_LOG.info("testRestoreFromSnapshot ending");
1293     }
1294
1295     @Test
1296     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1297         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1298
1299         String persistenceId = factory.generateActorId("test-actor-");
1300         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1301         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1302
1303         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
1304         Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
1305                 5, 2, 5, 2, 2, "member-1");
1306
1307         InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1308                 new MockRaftActorContext.MockPayload("B")));
1309
1310         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1311                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1312                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1313         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1314
1315         mockRaftActor.waitForRecoveryComplete();
1316
1317         verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class));
1318
1319         RaftActorContext context = mockRaftActor.getRaftActorContext();
1320         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1321         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1322         assertEquals("Last applied", -1, context.getLastApplied());
1323         assertEquals("Commit index", -1, context.getCommitIndex());
1324         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1325         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1326
1327         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1328     }
1329
1330     @Test
1331     public void testNonVotingOnRecovery() throws Exception {
1332         TEST_LOG.info("testNonVotingOnRecovery starting");
1333
1334         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1335         config.setElectionTimeoutFactor(1);
1336         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1337
1338         String persistenceId = factory.generateActorId("test-actor-");
1339         InMemoryJournal.addEntry(persistenceId, 1,  new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1340                 new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
1341
1342         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1343                 config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1344         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1345
1346         mockRaftActor.waitForInitializeBehaviorComplete();
1347
1348         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1349
1350         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1351         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1352
1353         TEST_LOG.info("testNonVotingOnRecovery ending");
1354     }
1355
1356     @Test
1357     public void testLeaderTransitioning() throws Exception {
1358         TEST_LOG.info("testLeaderTransitioning starting");
1359
1360         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
1361                 Props.create(MessageCollectorActor.class));
1362
1363         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1364         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1365
1366         String persistenceId = factory.generateActorId("test-actor-");
1367
1368         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1369                 config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1370         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1371
1372         mockRaftActor.waitForInitializeBehaviorComplete();
1373
1374         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
1375                 0L, -1L, (short)1), ActorRef.noSender());
1376         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1377                 notifierActor, LeaderStateChanged.class);
1378         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1379
1380         MessageCollectorActor.clearMessages(notifierActor);
1381
1382         raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
1383
1384         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1385         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1386         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1387
1388         TEST_LOG.info("testLeaderTransitioning ending");
1389     }
1390 }