BUG-5626: use Identifier instead of String
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNotSame;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Matchers.anyObject;
20 import static org.mockito.Matchers.eq;
21 import static org.mockito.Matchers.same;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.never;
25 import static org.mockito.Mockito.reset;
26 import static org.mockito.Mockito.timeout;
27 import static org.mockito.Mockito.verify;
28 import akka.actor.ActorRef;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status.Failure;
32 import akka.actor.Terminated;
33 import akka.dispatch.Dispatchers;
34 import akka.japi.Procedure;
35 import akka.persistence.SaveSnapshotFailure;
36 import akka.persistence.SaveSnapshotSuccess;
37 import akka.persistence.SnapshotMetadata;
38 import akka.persistence.SnapshotOffer;
39 import akka.testkit.JavaTestKit;
40 import akka.testkit.TestActorRef;
41 import com.google.common.base.Optional;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.util.concurrent.Uninterruptibles;
44 import com.google.protobuf.ByteString;
45 import java.io.ByteArrayOutputStream;
46 import java.io.ObjectOutputStream;
47 import java.util.ArrayList;
48 import java.util.Arrays;
49 import java.util.Collections;
50 import java.util.HashMap;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.junit.After;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.mockito.ArgumentCaptor;
60 import org.opendaylight.controller.cluster.DataPersistenceProvider;
61 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
62 import org.opendaylight.controller.cluster.PersistentDataProvider;
63 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
64 import org.opendaylight.controller.cluster.notifications.RoleChanged;
65 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
66 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
68 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
69 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
70 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
71 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
73 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
74 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
75 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
76 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
77 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
78 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
79 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
80 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
81 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
82 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
83 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
84 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
85 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
86 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
87 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
88 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
89 import org.opendaylight.yangtools.concepts.Identifier;
90 import org.opendaylight.yangtools.util.StringIdentifier;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.duration.Duration;
94 import scala.concurrent.duration.FiniteDuration;
95
96 public class RaftActorTest extends AbstractActorTest {
97
98     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
99
100     private TestActorFactory factory;
101
102     @Before
103     public void setUp(){
104         factory = new TestActorFactory(getSystem());
105     }
106
107     @After
108     public void tearDown() throws Exception {
109         factory.close();
110         InMemoryJournal.clear();
111         InMemorySnapshotStore.clear();
112     }
113
114     @Test
115     public void testConstruction() {
116         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
117     }
118
119     @Test
120     public void testFindLeaderWhenLeaderIsSelf(){
121         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
122         kit.waitUntilLeader();
123     }
124
125
126     @Test
127     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
128         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
129
130         new JavaTestKit(getSystem()) {{
131             String persistenceId = factory.generateActorId("follower-");
132
133             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
134
135             // Set the heartbeat interval high to essentially disable election otherwise the test
136             // may fail if the actor is switched to Leader and the commitIndex is set to the last
137             // log entry.
138             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
139
140             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
141             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
142                     peerAddresses, config), persistenceId);
143
144             watch(followerActor);
145
146             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
147             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
148                     new MockRaftActorContext.MockPayload("E"));
149             snapshotUnappliedEntries.add(entry1);
150
151             int lastAppliedDuringSnapshotCapture = 3;
152             int lastIndexDuringSnapshotCapture = 4;
153
154             // 4 messages as part of snapshot, which are applied to state
155             ByteString snapshotBytes = fromObject(Arrays.asList(
156                     new MockRaftActorContext.MockPayload("A"),
157                     new MockRaftActorContext.MockPayload("B"),
158                     new MockRaftActorContext.MockPayload("C"),
159                     new MockRaftActorContext.MockPayload("D")));
160
161             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
162                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
163                     lastAppliedDuringSnapshotCapture, 1);
164             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
165
166             // add more entries after snapshot is taken
167             List<ReplicatedLogEntry> entries = new ArrayList<>();
168             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
169                     new MockRaftActorContext.MockPayload("F", 2));
170             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
171                     new MockRaftActorContext.MockPayload("G", 3));
172             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
173                     new MockRaftActorContext.MockPayload("H", 4));
174             entries.add(entry2);
175             entries.add(entry3);
176             entries.add(entry4);
177
178             int lastAppliedToState = 5;
179             int lastIndex = 7;
180
181             InMemoryJournal.addEntry(persistenceId, 5, entry2);
182             // 2 entries are applied to state besides the 4 entries in snapshot
183             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
184             InMemoryJournal.addEntry(persistenceId, 7, entry3);
185             InMemoryJournal.addEntry(persistenceId, 8, entry4);
186
187             // kill the actor
188             followerActor.tell(PoisonPill.getInstance(), null);
189             expectMsgClass(duration("5 seconds"), Terminated.class);
190
191             unwatch(followerActor);
192
193             //reinstate the actor
194             TestActorRef<MockRaftActor> ref = factory.createTestActor(
195                     MockRaftActor.props(persistenceId, peerAddresses, config));
196
197             MockRaftActor mockRaftActor = ref.underlyingActor();
198
199             mockRaftActor.waitForRecoveryComplete();
200
201             RaftActorContext context = mockRaftActor.getRaftActorContext();
202             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
203                     context.getReplicatedLog().size());
204             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
205             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
206             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
207             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
208             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
209
210             mockRaftActor.waitForInitializeBehaviorComplete();
211
212             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
213         }};
214
215         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
216     }
217
218     @Test
219     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
220         new JavaTestKit(getSystem()) {{
221             String persistenceId = factory.generateActorId("follower-");
222
223             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
224
225             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
226
227             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
228                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
229                     config, new NonPersistentDataProvider()), persistenceId);
230
231             MockRaftActor mockRaftActor = ref.underlyingActor();
232
233             mockRaftActor.waitForRecoveryComplete();
234
235             mockRaftActor.waitForInitializeBehaviorComplete();
236
237             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
238         }};
239     }
240
241     @Test
242     public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
243         new JavaTestKit(getSystem()) {{
244             String persistenceId = factory.generateActorId("follower-");
245             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
246             config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
247             config.setElectionTimeoutFactor(1);
248
249             InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
250
251             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
252                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
253                     config, new NonPersistentDataProvider()).
254                             withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
255
256             InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
257             List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
258             assertEquals("UpdateElectionTerm entries", 1, entries.size());
259             UpdateElectionTerm updateEntry = entries.get(0);
260
261             factory.killActor(ref, this);
262
263             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
264             ref = factory.createTestActor(MockRaftActor.props(persistenceId,
265                     ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
266                     new NonPersistentDataProvider()).
267                             withDispatcher(Dispatchers.DefaultDispatcherId()),
268                             factory.generateActorId("follower-"));
269
270             MockRaftActor actor = ref.underlyingActor();
271             actor.waitForRecoveryComplete();
272
273             RaftActorContext newContext = actor.getRaftActorContext();
274             assertEquals("electionTerm", updateEntry.getCurrentTerm(),
275                     newContext.getTermInformation().getCurrentTerm());
276             assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
277
278             entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
279             assertEquals("UpdateElectionTerm entries", 1, entries.size());
280         }};
281     }
282
283     @Test
284     public void testRaftActorForwardsToRaftActorRecoverySupport() {
285         String persistenceId = factory.generateActorId("leader-");
286
287         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
288
289         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
290
291         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
292                 Collections.<String, String>emptyMap(), config), persistenceId);
293
294         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
295
296         // Wait for akka's recovery to complete so it doesn't interfere.
297         mockRaftActor.waitForRecoveryComplete();
298
299         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
300         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
301
302         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
303         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
304         mockRaftActor.handleRecover(snapshotOffer);
305
306         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
307                 1, new MockRaftActorContext.MockPayload("1", 5));
308         mockRaftActor.handleRecover(logEntry);
309
310         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
311         mockRaftActor.handleRecover(applyJournalEntries);
312
313         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
314         mockRaftActor.handleRecover(applyLogEntries);
315
316         DeleteEntries deleteEntries = new DeleteEntries(1);
317         mockRaftActor.handleRecover(deleteEntries);
318
319         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
320                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
321         mockRaftActor.handleRecover(deprecatedDeleteEntries);
322
323         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
324         mockRaftActor.handleRecover(updateElectionTerm);
325
326         org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
327                 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
328         mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
329
330         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
331         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
332         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
333         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
334         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
335         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
336         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
337         verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
338     }
339
340     @Test
341     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
342         String persistenceId = factory.generateActorId("leader-");
343
344         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
345
346         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
347
348         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
349
350         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
351                 config(config).snapshotMessageSupport(mockSupport).props());
352
353         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
354
355         // Wait for akka's recovery to complete so it doesn't interfere.
356         mockRaftActor.waitForRecoveryComplete();
357
358         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
359         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
360         mockRaftActor.handleCommand(applySnapshot);
361
362         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
363         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
364         mockRaftActor.handleCommand(captureSnapshot);
365
366         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
367         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
368         mockRaftActor.handleCommand(captureSnapshotReply);
369
370         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
371         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
372         mockRaftActor.handleCommand(saveSnapshotSuccess);
373
374         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
375         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
376         mockRaftActor.handleCommand(saveSnapshotFailure);
377
378         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
379                 any(ActorRef.class));
380         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
381
382         doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
383         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
384
385         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
386         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
387         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
388         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
389         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
390         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
391                 any(ActorRef.class));
392         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
393     }
394
395     @Test
396     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
397         new JavaTestKit(getSystem()) {
398             {
399                 String persistenceId = factory.generateActorId("leader-");
400
401                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
402
403                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
404
405                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
406
407                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
408                         Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
409
410                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
411
412                 mockRaftActor.waitForInitializeBehaviorComplete();
413
414                 mockRaftActor.waitUntilLeader();
415
416                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
417
418                 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
419
420             }
421
422         };
423     }
424
425     @Test
426     public void testApplyState() throws Exception {
427
428         new JavaTestKit(getSystem()) {
429             {
430                 String persistenceId = factory.generateActorId("leader-");
431
432                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
433
434                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
435
436                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
437
438                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
439                         Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
440
441                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
442
443                 mockRaftActor.waitForInitializeBehaviorComplete();
444
445                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
446                         new MockRaftActorContext.MockPayload("F"));
447
448                 final Identifier id = new StringIdentifier("apply-state");
449                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
450
451                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
452
453             }
454         };
455     }
456
457     @Test
458     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
459         new JavaTestKit(getSystem()) {{
460             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
461                     Props.create(MessageCollectorActor.class));
462             MessageCollectorActor.waitUntilReady(notifierActor);
463
464             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
465             long heartBeatInterval = 100;
466             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
467             config.setElectionTimeoutFactor(20);
468
469             String persistenceId = factory.generateActorId("notifier-");
470
471             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
472                     config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
473                             new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
474                             persistenceId);
475
476             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
477
478
479             // check if the notifier got a role change from null to Follower
480             RoleChanged raftRoleChanged = matches.get(0);
481             assertEquals(persistenceId, raftRoleChanged.getMemberId());
482             assertNull(raftRoleChanged.getOldRole());
483             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
484
485             // check if the notifier got a role change from Follower to Candidate
486             raftRoleChanged = matches.get(1);
487             assertEquals(persistenceId, raftRoleChanged.getMemberId());
488             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
489             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
490
491             // check if the notifier got a role change from Candidate to Leader
492             raftRoleChanged = matches.get(2);
493             assertEquals(persistenceId, raftRoleChanged.getMemberId());
494             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
495             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
496
497             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
498                     notifierActor, LeaderStateChanged.class);
499
500             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
501             assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
502
503             notifierActor.underlyingActor().clear();
504
505             MockRaftActor raftActor = raftActorRef.underlyingActor();
506             final String newLeaderId = "new-leader";
507             final short newLeaderVersion = 6;
508             Follower follower = new Follower(raftActor.getRaftActorContext()) {
509                 @Override
510                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
511                     setLeaderId(newLeaderId);
512                     setLeaderPayloadVersion(newLeaderVersion);
513                     return this;
514                 }
515             };
516
517             raftActor.newBehavior(follower);
518
519             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
520             assertEquals(persistenceId, leaderStateChange.getMemberId());
521             assertEquals(null, leaderStateChange.getLeaderId());
522
523             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
524             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
525             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
526
527             notifierActor.underlyingActor().clear();
528
529             raftActor.handleCommand("any");
530
531             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
532             assertEquals(persistenceId, leaderStateChange.getMemberId());
533             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
534             assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
535
536             notifierActor.underlyingActor().clear();
537
538             raftActor.handleCommand("any");
539
540             Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
541             leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
542             assertNull(leaderStateChange);
543         }};
544     }
545
546     @Test
547     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
548         new JavaTestKit(getSystem()) {{
549             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
550             MessageCollectorActor.waitUntilReady(notifierActor);
551
552             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
553             long heartBeatInterval = 100;
554             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
555             config.setElectionTimeoutFactor(1);
556
557             String persistenceId = factory.generateActorId("notifier-");
558
559             factory.createActor(MockRaftActor.builder().id(persistenceId).
560                     peerAddresses(ImmutableMap.of("leader", "fake/path")).
561                     config(config).roleChangeNotifier(notifierActor).props());
562
563             List<RoleChanged> matches =  null;
564             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
565                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
566                 assertNotNull(matches);
567                 if(matches.size() == 3) {
568                     break;
569                 }
570                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
571             }
572
573             assertNotNull(matches);
574             assertEquals(2, matches.size());
575
576             // check if the notifier got a role change from null to Follower
577             RoleChanged raftRoleChanged = matches.get(0);
578             assertEquals(persistenceId, raftRoleChanged.getMemberId());
579             assertNull(raftRoleChanged.getOldRole());
580             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
581
582             // check if the notifier got a role change from Follower to Candidate
583             raftRoleChanged = matches.get(1);
584             assertEquals(persistenceId, raftRoleChanged.getMemberId());
585             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
586             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
587
588         }};
589     }
590
591     @Test
592     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
593         new JavaTestKit(getSystem()) {
594             {
595                 String persistenceId = factory.generateActorId("leader-");
596                 String follower1Id = factory.generateActorId("follower-");
597
598                 ActorRef followerActor1 =
599                         factory.createActor(Props.create(MessageCollectorActor.class));
600
601                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
602                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
603                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
604
605                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
606
607                 Map<String, String> peerAddresses = new HashMap<>();
608                 peerAddresses.put(follower1Id, followerActor1.path().toString());
609
610                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
611                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
612
613                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
614
615                 leaderActor.getRaftActorContext().setCommitIndex(4);
616                 leaderActor.getRaftActorContext().setLastApplied(4);
617                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
618
619                 leaderActor.waitForInitializeBehaviorComplete();
620
621                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
622
623                 Leader leader = new Leader(leaderActor.getRaftActorContext());
624                 leaderActor.setCurrentBehavior(leader);
625                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
626
627                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
628                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
629
630                 assertEquals(8, leaderActor.getReplicatedLog().size());
631
632                 leaderActor.getRaftActorContext().getSnapshotManager()
633                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
634                                 new MockRaftActorContext.MockPayload("x")), 4);
635
636                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
637
638                 assertEquals(8, leaderActor.getReplicatedLog().size());
639
640                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
641                 //fake snapshot on index 5
642                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
643
644                 assertEquals(8, leaderActor.getReplicatedLog().size());
645
646                 //fake snapshot on index 6
647                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
648                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
649                 assertEquals(8, leaderActor.getReplicatedLog().size());
650
651                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
652
653                 assertEquals(8, leaderActor.getReplicatedLog().size());
654
655                 ByteString snapshotBytes = fromObject(Arrays.asList(
656                         new MockRaftActorContext.MockPayload("foo-0"),
657                         new MockRaftActorContext.MockPayload("foo-1"),
658                         new MockRaftActorContext.MockPayload("foo-2"),
659                         new MockRaftActorContext.MockPayload("foo-3"),
660                         new MockRaftActorContext.MockPayload("foo-4")));
661
662                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
663                         Runtime.getRuntime().totalMemory());
664
665                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
666
667                 // The commit is needed to complete the snapshot creation process
668                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
669
670                 // capture snapshot reply should remove the snapshotted entries only
671                 assertEquals(3, leaderActor.getReplicatedLog().size());
672                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
673
674                 // add another non-replicated entry
675                 leaderActor.getReplicatedLog().append(
676                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
677
678                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
679                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
680                 assertEquals(2, leaderActor.getReplicatedLog().size());
681                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
682
683             }
684         };
685     }
686
687     @Test
688     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
689         new JavaTestKit(getSystem()) {
690             {
691                 String persistenceId = factory.generateActorId("follower-");
692                 String leaderId = factory.generateActorId("leader-");
693
694
695                 ActorRef leaderActor1 =
696                         factory.createActor(Props.create(MessageCollectorActor.class));
697
698                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
699                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
700                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
701
702                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
703
704                 Map<String, String> peerAddresses = new HashMap<>();
705                 peerAddresses.put(leaderId, leaderActor1.path().toString());
706
707                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
708                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
709
710                 MockRaftActor followerActor = mockActorRef.underlyingActor();
711                 followerActor.getRaftActorContext().setCommitIndex(4);
712                 followerActor.getRaftActorContext().setLastApplied(4);
713                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
714
715                 followerActor.waitForInitializeBehaviorComplete();
716
717
718                 Follower follower = new Follower(followerActor.getRaftActorContext());
719                 followerActor.setCurrentBehavior(follower);
720                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
721
722                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
723                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
724                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
725
726                 // log has indices 0-5
727                 assertEquals(6, followerActor.getReplicatedLog().size());
728
729                 //snapshot on 4
730                 followerActor.getRaftActorContext().getSnapshotManager().capture(
731                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
732                                 new MockRaftActorContext.MockPayload("D")), 4);
733
734                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
735
736                 assertEquals(6, followerActor.getReplicatedLog().size());
737
738                 //fake snapshot on index 6
739                 List<ReplicatedLogEntry> entries =
740                         Arrays.asList(
741                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
742                                         new MockRaftActorContext.MockPayload("foo-6"))
743                         );
744                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
745                 assertEquals(7, followerActor.getReplicatedLog().size());
746
747                 //fake snapshot on index 7
748                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
749
750                 entries =
751                         Arrays.asList(
752                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
753                                         new MockRaftActorContext.MockPayload("foo-7"))
754                         );
755                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
756                 assertEquals(8, followerActor.getReplicatedLog().size());
757
758                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
759
760
761                 ByteString snapshotBytes = fromObject(Arrays.asList(
762                         new MockRaftActorContext.MockPayload("foo-0"),
763                         new MockRaftActorContext.MockPayload("foo-1"),
764                         new MockRaftActorContext.MockPayload("foo-2"),
765                         new MockRaftActorContext.MockPayload("foo-3"),
766                         new MockRaftActorContext.MockPayload("foo-4")));
767                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
768                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
769
770                 // The commit is needed to complete the snapshot creation process
771                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
772
773                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
774                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
775                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
776
777                 entries =
778                         Arrays.asList(
779                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
780                                         new MockRaftActorContext.MockPayload("foo-7"))
781                         );
782                 // send an additional entry 8 with leaderCommit = 7
783                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
784
785                 // 7 and 8, as lastapplied is 7
786                 assertEquals(2, followerActor.getReplicatedLog().size());
787
788             }
789         };
790     }
791
792     @Test
793     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
794         new JavaTestKit(getSystem()) {
795             {
796                 String persistenceId = factory.generateActorId("leader-");
797                 String follower1Id = factory.generateActorId("follower-");
798                 String follower2Id = factory.generateActorId("follower-");
799
800                 ActorRef followerActor1 =
801                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
802                 ActorRef followerActor2 =
803                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
804
805                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
806                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
807                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
808
809                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
810
811                 Map<String, String> peerAddresses = new HashMap<>();
812                 peerAddresses.put(follower1Id, followerActor1.path().toString());
813                 peerAddresses.put(follower2Id, followerActor2.path().toString());
814
815                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
816                         MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
817
818                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
819                 leaderActor.getRaftActorContext().setCommitIndex(9);
820                 leaderActor.getRaftActorContext().setLastApplied(9);
821                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
822
823                 leaderActor.waitForInitializeBehaviorComplete();
824
825                 Leader leader = new Leader(leaderActor.getRaftActorContext());
826                 leaderActor.setCurrentBehavior(leader);
827                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
828
829                 // create 5 entries in the log
830                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
831                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
832
833                 //set the snapshot index to 4 , 0 to 4 are snapshotted
834                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
835                 //setting replicatedToAllIndex = 9, for the log to clear
836                 leader.setReplicatedToAllIndex(9);
837                 assertEquals(5, leaderActor.getReplicatedLog().size());
838                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
839
840                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
841                 assertEquals(5, leaderActor.getReplicatedLog().size());
842                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
843
844                 // set the 2nd follower nextIndex to 1 which has been snapshotted
845                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
846                 assertEquals(5, leaderActor.getReplicatedLog().size());
847                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
848
849                 // simulate a real snapshot
850                 leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
851                 assertEquals(5, leaderActor.getReplicatedLog().size());
852                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
853                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
854                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
855
856
857                 //reply from a slow follower does not initiate a fake snapshot
858                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
859                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
860
861                 ByteString snapshotBytes = fromObject(Arrays.asList(
862                         new MockRaftActorContext.MockPayload("foo-0"),
863                         new MockRaftActorContext.MockPayload("foo-1"),
864                         new MockRaftActorContext.MockPayload("foo-2"),
865                         new MockRaftActorContext.MockPayload("foo-3"),
866                         new MockRaftActorContext.MockPayload("foo-4")));
867                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
868                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
869
870                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
871
872                 //reply from a slow follower after should not raise errors
873                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
874                 assertEquals(0, leaderActor.getReplicatedLog().size());
875             }
876         };
877     }
878
879     @Test
880     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
881         new JavaTestKit(getSystem()) {{
882             String persistenceId = factory.generateActorId("leader-");
883             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
884             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
885             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
886             config.setSnapshotBatchCount(5);
887
888             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
889
890             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
891
892             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
893                     MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
894
895             MockRaftActor leaderActor = mockActorRef.underlyingActor();
896             leaderActor.getRaftActorContext().setCommitIndex(3);
897             leaderActor.getRaftActorContext().setLastApplied(3);
898             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
899
900             leaderActor.waitForInitializeBehaviorComplete();
901             for(int i=0;i< 4;i++) {
902                 leaderActor.getReplicatedLog()
903                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
904                                 new MockRaftActorContext.MockPayload("A")));
905             }
906
907             Leader leader = new Leader(leaderActor.getRaftActorContext());
908             leaderActor.setCurrentBehavior(leader);
909             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
910
911             // Simulate an install snaphost to a follower.
912             leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
913                     leaderActor.getReplicatedLog().last(), -1, "member1");
914
915             // Now send a CaptureSnapshotReply
916             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
917
918             // Trimming log in this scenario is a no-op
919             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
920             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
921             assertEquals(-1, leader.getReplicatedToAllIndex());
922
923         }};
924     }
925
926     @Test
927     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
928         new JavaTestKit(getSystem()) {{
929             String persistenceId = factory.generateActorId("leader-");
930             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
931             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
932             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
933             config.setSnapshotBatchCount(5);
934
935             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
936
937             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
938
939             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
940                     MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
941
942             MockRaftActor leaderActor = mockActorRef.underlyingActor();
943             leaderActor.getRaftActorContext().setCommitIndex(3);
944             leaderActor.getRaftActorContext().setLastApplied(3);
945             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
946             leaderActor.getReplicatedLog().setSnapshotIndex(3);
947
948             leaderActor.waitForInitializeBehaviorComplete();
949             Leader leader = new Leader(leaderActor.getRaftActorContext());
950             leaderActor.setCurrentBehavior(leader);
951             leader.setReplicatedToAllIndex(3);
952             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
953
954             // Persist another entry (this will cause a CaptureSnapshot to be triggered
955             leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
956                 new MockRaftActorContext.MockPayload("duh"));
957
958             // Now send a CaptureSnapshotReply
959             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
960
961             // Trimming log in this scenario is a no-op
962             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
963             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
964             assertEquals(3, leader.getReplicatedToAllIndex());
965
966         }};
967     }
968
969     @Test
970     public void testRaftActorOnRecoverySnapshot() throws Exception {
971         TEST_LOG.info("testRaftActorOnRecoverySnapshot");
972
973         new JavaTestKit(getSystem()) {{
974                 String persistenceId = factory.generateActorId("follower-");
975
976                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
977
978                 // Set the heartbeat interval high to essentially disable election otherwise the test
979                 // may fail if the actor is switched to Leader
980                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
981
982                 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
983
984                 // Create mock ReplicatedLogEntry
985                 ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
986                         new MockRaftActorContext.MockPayload("F", 1));
987
988                 InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
989
990                 TestActorRef<MockRaftActor> ref = factory.createTestActor(
991                         MockRaftActor.props(persistenceId, peerAddresses, config));
992
993                 MockRaftActor mockRaftActor = ref.underlyingActor();
994
995                 mockRaftActor.waitForRecoveryComplete();
996
997                 mockRaftActor.waitForInitializeBehaviorComplete();
998
999                 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1000             }};
1001     }
1002
1003     @Test
1004     public void testSwitchBehavior(){
1005         String persistenceId = factory.generateActorId("leader-");
1006         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1007         config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1008         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1009         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1010         config.setSnapshotBatchCount(5);
1011
1012         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1013
1014         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
1015
1016         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1017                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
1018
1019         MockRaftActor leaderActor = mockActorRef.underlyingActor();
1020
1021         leaderActor.waitForRecoveryComplete();
1022
1023         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
1024
1025         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1026         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
1027
1028         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
1029
1030         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1031         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1032
1033         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
1034
1035         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1036         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1037
1038         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
1039
1040         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1041         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1042     }
1043
1044     public static ByteString fromObject(Object snapshot) throws Exception {
1045         ByteArrayOutputStream b = null;
1046         ObjectOutputStream o = null;
1047         try {
1048             b = new ByteArrayOutputStream();
1049             o = new ObjectOutputStream(b);
1050             o.writeObject(snapshot);
1051             byte[] snapshotBytes = b.toByteArray();
1052             return ByteString.copyFrom(snapshotBytes);
1053         } finally {
1054             if (o != null) {
1055                 o.flush();
1056                 o.close();
1057             }
1058             if (b != null) {
1059                 b.close();
1060             }
1061         }
1062     }
1063
1064     @Test
1065     public void testUpdateConfigParam() throws Exception {
1066         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
1067         String persistenceId = factory.generateActorId("follower-");
1068         ImmutableMap<String, String> peerAddresses =
1069             ImmutableMap.<String, String>builder().put("member1", "address").build();
1070         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1071
1072         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
1073                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
1074         MockRaftActor mockRaftActor = actorRef.underlyingActor();
1075         mockRaftActor.waitForInitializeBehaviorComplete();
1076
1077         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
1078         mockRaftActor.updateConfigParams(emptyConfig);
1079         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1080         assertEquals("Behavior State", RaftState.Follower,
1081             mockRaftActor.getCurrentBehavior().state());
1082
1083         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
1084         disableConfig.setCustomRaftPolicyImplementationClass(
1085             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1086         mockRaftActor.updateConfigParams(disableConfig);
1087         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1088         assertEquals("Behavior State", RaftState.Follower,
1089             mockRaftActor.getCurrentBehavior().state());
1090
1091         behavior = mockRaftActor.getCurrentBehavior();
1092         mockRaftActor.updateConfigParams(disableConfig);
1093         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1094         assertEquals("Behavior State", RaftState.Follower,
1095             mockRaftActor.getCurrentBehavior().state());
1096
1097         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
1098         defaultConfig.setCustomRaftPolicyImplementationClass(
1099             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
1100         mockRaftActor.updateConfigParams(defaultConfig);
1101         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1102         assertEquals("Behavior State", RaftState.Follower,
1103             mockRaftActor.getCurrentBehavior().state());
1104
1105         behavior = mockRaftActor.getCurrentBehavior();
1106         mockRaftActor.updateConfigParams(defaultConfig);
1107         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1108         assertEquals("Behavior State", RaftState.Follower,
1109             mockRaftActor.getCurrentBehavior().state());
1110     }
1111
1112     @Test
1113     public void testGetSnapshot() throws Exception {
1114         TEST_LOG.info("testGetSnapshot starting");
1115
1116         JavaTestKit kit = new JavaTestKit(getSystem());
1117
1118         String persistenceId = factory.generateActorId("test-actor-");
1119         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1120         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1121
1122         long term = 3;
1123         long seqN = 1;
1124         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1125         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
1126                 new MockRaftActorContext.MockPayload("A")));
1127         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
1128                 new MockRaftActorContext.MockPayload("B")));
1129         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1130         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
1131                 new MockRaftActorContext.MockPayload("C")));
1132
1133         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1134                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
1135                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1136         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1137
1138         mockRaftActor.waitForRecoveryComplete();
1139
1140         // Wait for snapshot after recovery
1141         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1142
1143         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1144
1145         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1146
1147         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1148         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
1149
1150         byte[] stateSnapshot = new byte[]{1,2,3};
1151         replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
1152
1153         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1154
1155         assertEquals("getId", persistenceId, reply.getId());
1156         Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1157         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1158         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1159         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1160         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1161         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1162         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1163         assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
1164         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1165         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1166
1167         // Test with timeout
1168
1169         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
1170         reset(mockRaftActor.snapshotCohortDelegate);
1171
1172         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1173         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1174         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1175
1176         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1177
1178         // Test with persistence disabled.
1179
1180         mockRaftActor.setPersistence(false);
1181         reset(mockRaftActor.snapshotCohortDelegate);
1182
1183         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1184         reply = kit.expectMsgClass(GetSnapshotReply.class);
1185         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
1186
1187         assertEquals("getId", persistenceId, reply.getId());
1188         replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1189         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1190         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1191         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1192         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1193         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1194         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1195         assertEquals("getState length", 0, replySnapshot.getState().length);
1196         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1197
1198         TEST_LOG.info("testGetSnapshot ending");
1199     }
1200
1201     @Test
1202     public void testRestoreFromSnapshot() throws Exception {
1203         TEST_LOG.info("testRestoreFromSnapshot starting");
1204
1205         String persistenceId = factory.generateActorId("test-actor-");
1206         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1207         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1208
1209         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
1210         snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
1211                 new MockRaftActorContext.MockPayload("E")));
1212
1213         int snapshotLastApplied = 3;
1214         int snapshotLastIndex = 4;
1215
1216         List<MockPayload> state = Arrays.asList(
1217                 new MockRaftActorContext.MockPayload("A"),
1218                 new MockRaftActorContext.MockPayload("B"),
1219                 new MockRaftActorContext.MockPayload("C"),
1220                 new MockRaftActorContext.MockPayload("D"));
1221         ByteString stateBytes = fromObject(state);
1222
1223         Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
1224                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
1225
1226         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1227
1228         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1229                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1230                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1231         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1232
1233         mockRaftActor.waitForRecoveryComplete();
1234
1235         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1236         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1237         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1238         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1239         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1240         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1241         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1242         assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
1243         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1244
1245         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
1246
1247         RaftActorContext context = mockRaftActor.getRaftActorContext();
1248         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1249         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1250         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1251         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1252         assertEquals("Recovered state", state, mockRaftActor.getState());
1253         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1254         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1255
1256         // Test with data persistence disabled
1257
1258         snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
1259                 -1, -1, -1, -1, 5, "member-1");
1260
1261         persistenceId = factory.generateActorId("test-actor-");
1262
1263         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1264                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
1265                 persistent(Optional.of(Boolean.FALSE)).props().
1266                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1267         mockRaftActor = raftActorRef.underlyingActor();
1268
1269         mockRaftActor.waitForRecoveryComplete();
1270         assertEquals("snapshot committed", true,
1271                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1272
1273         context = mockRaftActor.getRaftActorContext();
1274         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1275         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1276
1277         TEST_LOG.info("testRestoreFromSnapshot ending");
1278     }
1279
1280     @Test
1281     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1282         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1283
1284         String persistenceId = factory.generateActorId("test-actor-");
1285         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1286         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1287
1288         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
1289         Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
1290                 5, 2, 5, 2, 2, "member-1");
1291
1292         InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1293                 new MockRaftActorContext.MockPayload("B")));
1294
1295         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1296                 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1297                     withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1298         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1299
1300         mockRaftActor.waitForRecoveryComplete();
1301
1302         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1303         verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
1304
1305         RaftActorContext context = mockRaftActor.getRaftActorContext();
1306         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1307         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1308         assertEquals("Last applied", -1, context.getLastApplied());
1309         assertEquals("Commit index", -1, context.getCommitIndex());
1310         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1311         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1312
1313         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1314     }
1315
1316     @Test
1317     public void testNonVotingOnRecovery() throws Exception {
1318         TEST_LOG.info("testNonVotingOnRecovery starting");
1319
1320         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1321         config.setElectionTimeoutFactor(1);
1322         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1323
1324         String persistenceId = factory.generateActorId("test-actor-");
1325         InMemoryJournal.addEntry(persistenceId, 1,  new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1326                 new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
1327
1328         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1329                 config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1330         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1331
1332         mockRaftActor.waitForInitializeBehaviorComplete();
1333
1334         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1335
1336         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1337         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1338
1339         TEST_LOG.info("testNonVotingOnRecovery ending");
1340     }
1341
1342     @Test
1343     public void testLeaderTransitioning() throws Exception {
1344         TEST_LOG.info("testLeaderTransitioning starting");
1345
1346         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
1347                 Props.create(MessageCollectorActor.class));
1348
1349         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1350         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1351
1352         String persistenceId = factory.generateActorId("test-actor-");
1353
1354         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1355                 config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1356         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1357
1358         mockRaftActor.waitForInitializeBehaviorComplete();
1359
1360         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
1361                 0L, -1L, (short)1), ActorRef.noSender());
1362         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1363                 notifierActor, LeaderStateChanged.class);
1364         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1365
1366         MessageCollectorActor.clearMessages(notifierActor);
1367
1368         raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
1369
1370         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1371         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1372         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1373
1374         TEST_LOG.info("testLeaderTransitioning ending");
1375     }
1376 }