Add election info to Snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyObject;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Matchers.same;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Procedure;
28 import akka.persistence.SaveSnapshotFailure;
29 import akka.persistence.SaveSnapshotSuccess;
30 import akka.persistence.SnapshotMetadata;
31 import akka.persistence.SnapshotOffer;
32 import akka.testkit.JavaTestKit;
33 import akka.testkit.TestActorRef;
34 import com.google.common.base.Optional;
35 import com.google.common.collect.ImmutableMap;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayOutputStream;
39 import java.io.ObjectOutputStream;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collections;
43 import java.util.HashMap;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.concurrent.TimeUnit;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
52 import org.opendaylight.controller.cluster.PersistentDataProvider;
53 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
54 import org.opendaylight.controller.cluster.notifications.RoleChanged;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
59 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
63 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
64 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
70 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75 import scala.concurrent.duration.FiniteDuration;
76
77 public class RaftActorTest extends AbstractActorTest {
78
79     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
80
81     private TestActorFactory factory;
82
83     @Before
84     public void setUp(){
85         factory = new TestActorFactory(getSystem());
86     }
87
88     @After
89     public void tearDown() throws Exception {
90         factory.close();
91         InMemoryJournal.clear();
92         InMemorySnapshotStore.clear();
93     }
94
95     @Test
96     public void testConstruction() {
97         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
98     }
99
100     @Test
101     public void testFindLeaderWhenLeaderIsSelf(){
102         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
103         kit.waitUntilLeader();
104     }
105
106     @Test
107     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
108         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
109
110         new JavaTestKit(getSystem()) {{
111             String persistenceId = factory.generateActorId("follower-");
112
113             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
114
115             // Set the heartbeat interval high to essentially disable election otherwise the test
116             // may fail if the actor is switched to Leader and the commitIndex is set to the last
117             // log entry.
118             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
119
120             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
121             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
122                     peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
123
124             watch(followerActor);
125
126             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
127             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
128                     new MockRaftActorContext.MockPayload("E"));
129             snapshotUnappliedEntries.add(entry1);
130
131             int lastAppliedDuringSnapshotCapture = 3;
132             int lastIndexDuringSnapshotCapture = 4;
133
134             // 4 messages as part of snapshot, which are applied to state
135             ByteString snapshotBytes = fromObject(Arrays.asList(
136                     new MockRaftActorContext.MockPayload("A"),
137                     new MockRaftActorContext.MockPayload("B"),
138                     new MockRaftActorContext.MockPayload("C"),
139                     new MockRaftActorContext.MockPayload("D")));
140
141             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
142                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
143                     lastAppliedDuringSnapshotCapture, 1);
144             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
145
146             // add more entries after snapshot is taken
147             List<ReplicatedLogEntry> entries = new ArrayList<>();
148             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
149                     new MockRaftActorContext.MockPayload("F", 2));
150             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
151                     new MockRaftActorContext.MockPayload("G", 3));
152             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
153                     new MockRaftActorContext.MockPayload("H", 4));
154             entries.add(entry2);
155             entries.add(entry3);
156             entries.add(entry4);
157
158             int lastAppliedToState = 5;
159             int lastIndex = 7;
160
161             InMemoryJournal.addEntry(persistenceId, 5, entry2);
162             // 2 entries are applied to state besides the 4 entries in snapshot
163             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
164             InMemoryJournal.addEntry(persistenceId, 7, entry3);
165             InMemoryJournal.addEntry(persistenceId, 8, entry4);
166
167             // kill the actor
168             followerActor.tell(PoisonPill.getInstance(), null);
169             expectMsgClass(duration("5 seconds"), Terminated.class);
170
171             unwatch(followerActor);
172
173             //reinstate the actor
174             TestActorRef<MockRaftActor> ref = factory.createTestActor(
175                     MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
176
177             MockRaftActor mockRaftActor = ref.underlyingActor();
178
179             mockRaftActor.waitForRecoveryComplete();
180
181             RaftActorContext context = mockRaftActor.getRaftActorContext();
182             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
183                     context.getReplicatedLog().size());
184             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
185             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
186             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
187             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
188             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
189
190             mockRaftActor.waitForInitializeBehaviorComplete();
191
192             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
193         }};
194
195         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
196     }
197
198     @Test
199     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
200         new JavaTestKit(getSystem()) {{
201             String persistenceId = factory.generateActorId("follower-");
202
203             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
204
205             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
206
207             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
208                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
209                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
210
211             MockRaftActor mockRaftActor = ref.underlyingActor();
212
213             mockRaftActor.waitForRecoveryComplete();
214
215             mockRaftActor.waitForInitializeBehaviorComplete();
216
217             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
218         }};
219     }
220
221     @Test
222     public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
223         new JavaTestKit(getSystem()) {{
224             String persistenceId = factory.generateActorId("follower-");
225             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
226             config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
227             config.setElectionTimeoutFactor(1);
228
229             InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
230
231             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
232                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
233                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
234                             withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
235
236             InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
237             List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
238             assertEquals("UpdateElectionTerm entries", 1, entries.size());
239             UpdateElectionTerm updateEntry = entries.get(0);
240
241             factory.killActor(ref, this);
242
243             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
244             ref = factory.createTestActor(MockRaftActor.props(persistenceId,
245                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
246                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
247                             withDispatcher(Dispatchers.DefaultDispatcherId()),
248                             factory.generateActorId("follower-"));
249
250             MockRaftActor actor = ref.underlyingActor();
251             actor.waitForRecoveryComplete();
252
253             RaftActorContext newContext = actor.getRaftActorContext();
254             assertEquals("electionTerm", updateEntry.getCurrentTerm(),
255                     newContext.getTermInformation().getCurrentTerm());
256             assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
257
258             entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
259             assertEquals("UpdateElectionTerm entries", 1, entries.size());
260         }};
261     }
262
263     @Test
264     public void testRaftActorForwardsToRaftActorRecoverySupport() {
265         String persistenceId = factory.generateActorId("leader-");
266
267         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
268
269         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
270
271         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
272                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
273
274         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
275
276         // Wait for akka's recovery to complete so it doesn't interfere.
277         mockRaftActor.waitForRecoveryComplete();
278
279         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
280         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
281
282         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
283         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
284         mockRaftActor.handleRecover(snapshotOffer);
285
286         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
287                 1, new MockRaftActorContext.MockPayload("1", 5));
288         mockRaftActor.handleRecover(logEntry);
289
290         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
291         mockRaftActor.handleRecover(applyJournalEntries);
292
293         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
294         mockRaftActor.handleRecover(applyLogEntries);
295
296         DeleteEntries deleteEntries = new DeleteEntries(1);
297         mockRaftActor.handleRecover(deleteEntries);
298
299         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
300                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
301         mockRaftActor.handleRecover(deprecatedDeleteEntries);
302
303         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
304         mockRaftActor.handleRecover(updateElectionTerm);
305
306         org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
307                 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
308         mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
309
310         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
311         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
312         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
313         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
314         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
315         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
316         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
317         verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
318     }
319
320     @Test
321     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
322         String persistenceId = factory.generateActorId("leader-");
323
324         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
325
326         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
327
328         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
329
330         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
331                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
332
333         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
334
335         // Wait for akka's recovery to complete so it doesn't interfere.
336         mockRaftActor.waitForRecoveryComplete();
337
338         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
339         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
340         mockRaftActor.handleCommand(applySnapshot);
341
342         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
343         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
344         mockRaftActor.handleCommand(captureSnapshot);
345
346         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
347         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
348         mockRaftActor.handleCommand(captureSnapshotReply);
349
350         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
351         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
352         mockRaftActor.handleCommand(saveSnapshotSuccess);
353
354         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
355         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
356         mockRaftActor.handleCommand(saveSnapshotFailure);
357
358         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
359         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
360
361         verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
362         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
363         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
364         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
365         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
366         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
367     }
368
369     @Test
370     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
371         new JavaTestKit(getSystem()) {
372             {
373                 String persistenceId = factory.generateActorId("leader-");
374
375                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
376
377                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
378
379                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
380
381                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
382                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
383
384                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
385
386                 mockRaftActor.waitForInitializeBehaviorComplete();
387
388                 mockRaftActor.waitUntilLeader();
389
390                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
391
392                 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
393
394             }
395
396         };
397     }
398
399     @Test
400     public void testApplyState() throws Exception {
401
402         new JavaTestKit(getSystem()) {
403             {
404                 String persistenceId = factory.generateActorId("leader-");
405
406                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
407
408                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
409
410                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
411
412                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
413                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
414
415                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
416
417                 mockRaftActor.waitForInitializeBehaviorComplete();
418
419                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
420                         new MockRaftActorContext.MockPayload("F"));
421
422                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
423
424                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
425
426             }
427         };
428     }
429
430     @Test
431     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
432         new JavaTestKit(getSystem()) {{
433             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
434                     Props.create(MessageCollectorActor.class));
435             MessageCollectorActor.waitUntilReady(notifierActor);
436
437             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
438             long heartBeatInterval = 100;
439             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
440             config.setElectionTimeoutFactor(20);
441
442             String persistenceId = factory.generateActorId("notifier-");
443
444             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
445                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
446                     new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
447
448             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
449
450
451             // check if the notifier got a role change from null to Follower
452             RoleChanged raftRoleChanged = matches.get(0);
453             assertEquals(persistenceId, raftRoleChanged.getMemberId());
454             assertNull(raftRoleChanged.getOldRole());
455             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
456
457             // check if the notifier got a role change from Follower to Candidate
458             raftRoleChanged = matches.get(1);
459             assertEquals(persistenceId, raftRoleChanged.getMemberId());
460             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
461             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
462
463             // check if the notifier got a role change from Candidate to Leader
464             raftRoleChanged = matches.get(2);
465             assertEquals(persistenceId, raftRoleChanged.getMemberId());
466             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
467             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
468
469             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
470                     notifierActor, LeaderStateChanged.class);
471
472             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
473             assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
474
475             notifierActor.underlyingActor().clear();
476
477             MockRaftActor raftActor = raftActorRef.underlyingActor();
478             final String newLeaderId = "new-leader";
479             final short newLeaderVersion = 6;
480             Follower follower = new Follower(raftActor.getRaftActorContext()) {
481                 @Override
482                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
483                     leaderId = newLeaderId;
484                     setLeaderPayloadVersion(newLeaderVersion);
485                     return this;
486                 }
487             };
488
489             raftActor.newBehavior(follower);
490
491             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
492             assertEquals(persistenceId, leaderStateChange.getMemberId());
493             assertEquals(null, leaderStateChange.getLeaderId());
494
495             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
496             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
497             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
498
499             notifierActor.underlyingActor().clear();
500
501             raftActor.handleCommand("any");
502
503             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
504             assertEquals(persistenceId, leaderStateChange.getMemberId());
505             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
506             assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
507
508             notifierActor.underlyingActor().clear();
509
510             raftActor.handleCommand("any");
511
512             Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
513             leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
514             assertNull(leaderStateChange);
515         }};
516     }
517
518     @Test
519     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
520         new JavaTestKit(getSystem()) {{
521             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
522             MessageCollectorActor.waitUntilReady(notifierActor);
523
524             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
525             long heartBeatInterval = 100;
526             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
527             config.setElectionTimeoutFactor(1);
528
529             String persistenceId = factory.generateActorId("notifier-");
530
531             factory.createActor(MockRaftActor.props(persistenceId,
532                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
533
534             List<RoleChanged> matches =  null;
535             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
536                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
537                 assertNotNull(matches);
538                 if(matches.size() == 3) {
539                     break;
540                 }
541                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
542             }
543
544             assertNotNull(matches);
545             assertEquals(2, matches.size());
546
547             // check if the notifier got a role change from null to Follower
548             RoleChanged raftRoleChanged = matches.get(0);
549             assertEquals(persistenceId, raftRoleChanged.getMemberId());
550             assertNull(raftRoleChanged.getOldRole());
551             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
552
553             // check if the notifier got a role change from Follower to Candidate
554             raftRoleChanged = matches.get(1);
555             assertEquals(persistenceId, raftRoleChanged.getMemberId());
556             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
557             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
558
559         }};
560     }
561
562     @Test
563     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
564         new JavaTestKit(getSystem()) {
565             {
566                 String persistenceId = factory.generateActorId("leader-");
567                 String follower1Id = factory.generateActorId("follower-");
568
569                 ActorRef followerActor1 =
570                         factory.createActor(Props.create(MessageCollectorActor.class));
571
572                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
573                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
574                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
575
576                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
577
578                 Map<String, String> peerAddresses = new HashMap<>();
579                 peerAddresses.put(follower1Id, followerActor1.path().toString());
580
581                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
582                         MockRaftActor.props(persistenceId, peerAddresses,
583                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
584
585                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
586
587                 leaderActor.getRaftActorContext().setCommitIndex(4);
588                 leaderActor.getRaftActorContext().setLastApplied(4);
589                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
590
591                 leaderActor.waitForInitializeBehaviorComplete();
592
593                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
594
595                 Leader leader = new Leader(leaderActor.getRaftActorContext());
596                 leaderActor.setCurrentBehavior(leader);
597                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
598
599                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
600                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
601
602                 assertEquals(8, leaderActor.getReplicatedLog().size());
603
604                 leaderActor.getRaftActorContext().getSnapshotManager()
605                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
606                                 new MockRaftActorContext.MockPayload("x")), 4);
607
608                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
609
610                 assertEquals(8, leaderActor.getReplicatedLog().size());
611
612                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
613                 //fake snapshot on index 5
614                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
615
616                 assertEquals(8, leaderActor.getReplicatedLog().size());
617
618                 //fake snapshot on index 6
619                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
620                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
621                 assertEquals(8, leaderActor.getReplicatedLog().size());
622
623                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
624
625                 assertEquals(8, leaderActor.getReplicatedLog().size());
626
627                 ByteString snapshotBytes = fromObject(Arrays.asList(
628                         new MockRaftActorContext.MockPayload("foo-0"),
629                         new MockRaftActorContext.MockPayload("foo-1"),
630                         new MockRaftActorContext.MockPayload("foo-2"),
631                         new MockRaftActorContext.MockPayload("foo-3"),
632                         new MockRaftActorContext.MockPayload("foo-4")));
633
634                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
635                         leader, Runtime.getRuntime().totalMemory());
636
637                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
638
639                 // The commit is needed to complete the snapshot creation process
640                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
641
642                 // capture snapshot reply should remove the snapshotted entries only
643                 assertEquals(3, leaderActor.getReplicatedLog().size());
644                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
645
646                 // add another non-replicated entry
647                 leaderActor.getReplicatedLog().append(
648                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
649
650                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
651                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
652                 assertEquals(2, leaderActor.getReplicatedLog().size());
653                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
654
655             }
656         };
657     }
658
659     @Test
660     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
661         new JavaTestKit(getSystem()) {
662             {
663                 String persistenceId = factory.generateActorId("follower-");
664                 String leaderId = factory.generateActorId("leader-");
665
666
667                 ActorRef leaderActor1 =
668                         factory.createActor(Props.create(MessageCollectorActor.class));
669
670                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
671                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
672                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
673
674                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
675
676                 Map<String, String> peerAddresses = new HashMap<>();
677                 peerAddresses.put(leaderId, leaderActor1.path().toString());
678
679                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
680                         MockRaftActor.props(persistenceId, peerAddresses,
681                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
682
683                 MockRaftActor followerActor = mockActorRef.underlyingActor();
684                 followerActor.getRaftActorContext().setCommitIndex(4);
685                 followerActor.getRaftActorContext().setLastApplied(4);
686                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
687
688                 followerActor.waitForInitializeBehaviorComplete();
689
690
691                 Follower follower = new Follower(followerActor.getRaftActorContext());
692                 followerActor.setCurrentBehavior(follower);
693                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
694
695                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
696                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
697                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
698
699                 // log has indices 0-5
700                 assertEquals(6, followerActor.getReplicatedLog().size());
701
702                 //snapshot on 4
703                 followerActor.getRaftActorContext().getSnapshotManager().capture(
704                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
705                                 new MockRaftActorContext.MockPayload("D")), 4);
706
707                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
708
709                 assertEquals(6, followerActor.getReplicatedLog().size());
710
711                 //fake snapshot on index 6
712                 List<ReplicatedLogEntry> entries =
713                         Arrays.asList(
714                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
715                                         new MockRaftActorContext.MockPayload("foo-6"))
716                         );
717                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
718                 assertEquals(7, followerActor.getReplicatedLog().size());
719
720                 //fake snapshot on index 7
721                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
722
723                 entries =
724                         Arrays.asList(
725                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
726                                         new MockRaftActorContext.MockPayload("foo-7"))
727                         );
728                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
729                 assertEquals(8, followerActor.getReplicatedLog().size());
730
731                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
732
733
734                 ByteString snapshotBytes = fromObject(Arrays.asList(
735                         new MockRaftActorContext.MockPayload("foo-0"),
736                         new MockRaftActorContext.MockPayload("foo-1"),
737                         new MockRaftActorContext.MockPayload("foo-2"),
738                         new MockRaftActorContext.MockPayload("foo-3"),
739                         new MockRaftActorContext.MockPayload("foo-4")));
740                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
741                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
742
743                 // The commit is needed to complete the snapshot creation process
744                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
745
746                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
747                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
748                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
749
750                 entries =
751                         Arrays.asList(
752                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
753                                         new MockRaftActorContext.MockPayload("foo-7"))
754                         );
755                 // send an additional entry 8 with leaderCommit = 7
756                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
757
758                 // 7 and 8, as lastapplied is 7
759                 assertEquals(2, followerActor.getReplicatedLog().size());
760
761             }
762         };
763     }
764
765     @Test
766     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
767         new JavaTestKit(getSystem()) {
768             {
769                 String persistenceId = factory.generateActorId("leader-");
770                 String follower1Id = factory.generateActorId("follower-");
771                 String follower2Id = factory.generateActorId("follower-");
772
773                 ActorRef followerActor1 =
774                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
775                 ActorRef followerActor2 =
776                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
777
778                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
779                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
780                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
781
782                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
783
784                 Map<String, String> peerAddresses = new HashMap<>();
785                 peerAddresses.put(follower1Id, followerActor1.path().toString());
786                 peerAddresses.put(follower2Id, followerActor2.path().toString());
787
788                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
789                         MockRaftActor.props(persistenceId, peerAddresses,
790                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
791
792                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
793                 leaderActor.getRaftActorContext().setCommitIndex(9);
794                 leaderActor.getRaftActorContext().setLastApplied(9);
795                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
796
797                 leaderActor.waitForInitializeBehaviorComplete();
798
799                 Leader leader = new Leader(leaderActor.getRaftActorContext());
800                 leaderActor.setCurrentBehavior(leader);
801                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
802
803                 // create 5 entries in the log
804                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
805                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
806
807                 //set the snapshot index to 4 , 0 to 4 are snapshotted
808                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
809                 //setting replicatedToAllIndex = 9, for the log to clear
810                 leader.setReplicatedToAllIndex(9);
811                 assertEquals(5, leaderActor.getReplicatedLog().size());
812                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
813
814                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
815                 assertEquals(5, leaderActor.getReplicatedLog().size());
816                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
817
818                 // set the 2nd follower nextIndex to 1 which has been snapshotted
819                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
820                 assertEquals(5, leaderActor.getReplicatedLog().size());
821                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
822
823                 // simulate a real snapshot
824                 leaderActor.onReceiveCommand(new SendHeartBeat());
825                 assertEquals(5, leaderActor.getReplicatedLog().size());
826                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
827                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
828                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
829
830
831                 //reply from a slow follower does not initiate a fake snapshot
832                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
833                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
834
835                 ByteString snapshotBytes = fromObject(Arrays.asList(
836                         new MockRaftActorContext.MockPayload("foo-0"),
837                         new MockRaftActorContext.MockPayload("foo-1"),
838                         new MockRaftActorContext.MockPayload("foo-2"),
839                         new MockRaftActorContext.MockPayload("foo-3"),
840                         new MockRaftActorContext.MockPayload("foo-4")));
841                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
842                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
843
844                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
845
846                 //reply from a slow follower after should not raise errors
847                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
848                 assertEquals(0, leaderActor.getReplicatedLog().size());
849             }
850         };
851     }
852
853     @Test
854     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
855         new JavaTestKit(getSystem()) {{
856             String persistenceId = factory.generateActorId("leader-");
857             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
858             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
859             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
860             config.setSnapshotBatchCount(5);
861
862             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
863
864             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
865
866             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
867                     MockRaftActor.props(persistenceId, peerAddresses,
868                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
869
870             MockRaftActor leaderActor = mockActorRef.underlyingActor();
871             leaderActor.getRaftActorContext().setCommitIndex(3);
872             leaderActor.getRaftActorContext().setLastApplied(3);
873             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
874
875             leaderActor.waitForInitializeBehaviorComplete();
876             for(int i=0;i< 4;i++) {
877                 leaderActor.getReplicatedLog()
878                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
879                                 new MockRaftActorContext.MockPayload("A")));
880             }
881
882             Leader leader = new Leader(leaderActor.getRaftActorContext());
883             leaderActor.setCurrentBehavior(leader);
884             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
885
886             // Simulate an install snaphost to a follower.
887             leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
888                     leaderActor.getReplicatedLog().last(), -1, "member1");
889
890             // Now send a CaptureSnapshotReply
891             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
892
893             // Trimming log in this scenario is a no-op
894             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
895             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
896             assertEquals(-1, leader.getReplicatedToAllIndex());
897
898         }};
899     }
900
901     @Test
902     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
903         new JavaTestKit(getSystem()) {{
904             String persistenceId = factory.generateActorId("leader-");
905             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
906             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
907             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
908             config.setSnapshotBatchCount(5);
909
910             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
911
912             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
913
914             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
915                     MockRaftActor.props(persistenceId, peerAddresses,
916                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
917
918             MockRaftActor leaderActor = mockActorRef.underlyingActor();
919             leaderActor.getRaftActorContext().setCommitIndex(3);
920             leaderActor.getRaftActorContext().setLastApplied(3);
921             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
922             leaderActor.getReplicatedLog().setSnapshotIndex(3);
923
924             leaderActor.waitForInitializeBehaviorComplete();
925             Leader leader = new Leader(leaderActor.getRaftActorContext());
926             leaderActor.setCurrentBehavior(leader);
927             leader.setReplicatedToAllIndex(3);
928             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
929
930             // Persist another entry (this will cause a CaptureSnapshot to be triggered
931             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
932
933             // Now send a CaptureSnapshotReply
934             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
935
936             // Trimming log in this scenario is a no-op
937             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
938             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
939             assertEquals(3, leader.getReplicatedToAllIndex());
940
941         }};
942     }
943
944     @Test
945     public void testSwitchBehavior(){
946         String persistenceId = factory.generateActorId("leader-");
947         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
948         config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
949         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
950         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
951         config.setSnapshotBatchCount(5);
952
953         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
954
955         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
956
957         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
958                 MockRaftActor.props(persistenceId, peerAddresses,
959                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
960
961         MockRaftActor leaderActor = mockActorRef.underlyingActor();
962
963         leaderActor.waitForRecoveryComplete();
964
965         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
966
967         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
968         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
969
970         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
971
972         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
973         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
974
975         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
976
977         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
978         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
979
980         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
981
982         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
983         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
984     }
985
986     public static ByteString fromObject(Object snapshot) throws Exception {
987         ByteArrayOutputStream b = null;
988         ObjectOutputStream o = null;
989         try {
990             b = new ByteArrayOutputStream();
991             o = new ObjectOutputStream(b);
992             o.writeObject(snapshot);
993             byte[] snapshotBytes = b.toByteArray();
994             return ByteString.copyFrom(snapshotBytes);
995         } finally {
996             if (o != null) {
997                 o.flush();
998                 o.close();
999             }
1000             if (b != null) {
1001                 b.close();
1002             }
1003         }
1004     }
1005
1006 }