Bug 7521: Convert DatastoreSnapshot.ShardSnapshot to store Snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNotSame;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Matchers.anyObject;
19 import static org.mockito.Matchers.eq;
20 import static org.mockito.Matchers.same;
21 import static org.mockito.Mockito.doReturn;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.never;
24 import static org.mockito.Mockito.reset;
25 import static org.mockito.Mockito.timeout;
26 import static org.mockito.Mockito.verify;
27
28 import akka.actor.ActorRef;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status.Failure;
32 import akka.actor.Terminated;
33 import akka.dispatch.Dispatchers;
34 import akka.japi.Procedure;
35 import akka.persistence.SaveSnapshotFailure;
36 import akka.persistence.SaveSnapshotSuccess;
37 import akka.persistence.SnapshotMetadata;
38 import akka.persistence.SnapshotOffer;
39 import akka.testkit.JavaTestKit;
40 import akka.testkit.TestActorRef;
41 import com.google.common.base.Optional;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.util.concurrent.Uninterruptibles;
44 import com.google.protobuf.ByteString;
45 import java.io.ByteArrayOutputStream;
46 import java.io.ObjectOutputStream;
47 import java.util.ArrayList;
48 import java.util.Arrays;
49 import java.util.Collections;
50 import java.util.HashMap;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Test;
58 import org.mockito.ArgumentCaptor;
59 import org.opendaylight.controller.cluster.DataPersistenceProvider;
60 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
61 import org.opendaylight.controller.cluster.PersistentDataProvider;
62 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
63 import org.opendaylight.controller.cluster.notifications.RoleChanged;
64 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
65 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
67 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
68 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
69 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
70 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
71 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
72 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
73 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
74 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
75 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
76 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
77 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
78 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
79 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
80 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
81 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
82 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
83 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
84 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
85 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
87 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
88 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.yangtools.concepts.Identifier;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
95 import scala.concurrent.duration.Duration;
96 import scala.concurrent.duration.FiniteDuration;
97
98 public class RaftActorTest extends AbstractActorTest {
99
100     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
101
102     private TestActorFactory factory;
103
104     @Before
105     public void setUp() {
106         factory = new TestActorFactory(getSystem());
107     }
108
109     @After
110     public void tearDown() throws Exception {
111         factory.close();
112         InMemoryJournal.clear();
113         InMemorySnapshotStore.clear();
114     }
115
116     @Test
117     public void testConstruction() {
118         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
119     }
120
121     @Test
122     public void testFindLeaderWhenLeaderIsSelf() {
123         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
124         kit.waitUntilLeader();
125     }
126
127
128     @Test
129     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
130         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
131
132         JavaTestKit kit = new JavaTestKit(getSystem());
133         String persistenceId = factory.generateActorId("follower-");
134
135         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
136
137         // Set the heartbeat interval high to essentially disable election otherwise the test
138         // may fail if the actor is switched to Leader and the commitIndex is set to the last
139         // log entry.
140         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
141
142         ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder()
143                 .put("member1", "address").build();
144         ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
145                 peerAddresses, config), persistenceId);
146
147         kit.watch(followerActor);
148
149         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
150         ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E"));
151         snapshotUnappliedEntries.add(entry1);
152
153         int lastAppliedDuringSnapshotCapture = 3;
154         int lastIndexDuringSnapshotCapture = 4;
155
156         // 4 messages as part of snapshot, which are applied to state
157         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
158                 new MockRaftActorContext.MockPayload("A"),
159                 new MockRaftActorContext.MockPayload("B"),
160                 new MockRaftActorContext.MockPayload("C"),
161                 new MockRaftActorContext.MockPayload("D")));
162
163         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
164                 lastAppliedDuringSnapshotCapture, 1, -1, null, null);
165         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
166
167         // add more entries after snapshot is taken
168         List<ReplicatedLogEntry> entries = new ArrayList<>();
169         ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F", 2));
170         ReplicatedLogEntry entry3 = new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("G", 3));
171         ReplicatedLogEntry entry4 = new SimpleReplicatedLogEntry(7, 1, new MockRaftActorContext.MockPayload("H", 4));
172         entries.add(entry2);
173         entries.add(entry3);
174         entries.add(entry4);
175
176         final int lastAppliedToState = 5;
177         final int lastIndex = 7;
178
179         InMemoryJournal.addEntry(persistenceId, 5, entry2);
180         // 2 entries are applied to state besides the 4 entries in snapshot
181         InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
182         InMemoryJournal.addEntry(persistenceId, 7, entry3);
183         InMemoryJournal.addEntry(persistenceId, 8, entry4);
184
185         // kill the actor
186         followerActor.tell(PoisonPill.getInstance(), null);
187         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
188
189         kit.unwatch(followerActor);
190
191         //reinstate the actor
192         TestActorRef<MockRaftActor> ref = factory.createTestActor(
193                 MockRaftActor.props(persistenceId, peerAddresses, config));
194
195         MockRaftActor mockRaftActor = ref.underlyingActor();
196
197         mockRaftActor.waitForRecoveryComplete();
198
199         RaftActorContext context = mockRaftActor.getRaftActorContext();
200         assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
201                 context.getReplicatedLog().size());
202         assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
203         assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
204         assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
205         assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
206         assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
207
208         mockRaftActor.waitForInitializeBehaviorComplete();
209
210         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
211
212         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
213     }
214
215     @Test
216     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
217         String persistenceId = factory.generateActorId("follower-");
218
219         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
220
221         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
222
223         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
224                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
225                 config, new NonPersistentDataProvider()), persistenceId);
226
227         MockRaftActor mockRaftActor = ref.underlyingActor();
228
229         mockRaftActor.waitForRecoveryComplete();
230
231         mockRaftActor.waitForInitializeBehaviorComplete();
232
233         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
234     }
235
236     @Test
237     public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
238         final JavaTestKit kit = new JavaTestKit(getSystem());
239         String persistenceId = factory.generateActorId("follower-");
240         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
241         config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
242         config.setElectionTimeoutFactor(1);
243
244         InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
245
246         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
247                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
248                 config, new NonPersistentDataProvider())
249                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
250
251         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
252         List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
253         assertEquals("UpdateElectionTerm entries", 1, entries.size());
254         final UpdateElectionTerm updateEntry = entries.get(0);
255
256         factory.killActor(ref, kit);
257
258         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
259         ref = factory.createTestActor(MockRaftActor.props(persistenceId,
260                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
261                 new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
262                 factory.generateActorId("follower-"));
263
264         MockRaftActor actor = ref.underlyingActor();
265         actor.waitForRecoveryComplete();
266
267         RaftActorContext newContext = actor.getRaftActorContext();
268         assertEquals("electionTerm", updateEntry.getCurrentTerm(),
269                 newContext.getTermInformation().getCurrentTerm());
270         assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
271
272         entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
273         assertEquals("UpdateElectionTerm entries", 1, entries.size());
274     }
275
276     @Test
277     public void testRaftActorForwardsToRaftActorRecoverySupport() {
278         String persistenceId = factory.generateActorId("leader-");
279
280         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
281
282         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
283
284         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
285                 Collections.<String, String>emptyMap(), config), persistenceId);
286
287         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
288
289         // Wait for akka's recovery to complete so it doesn't interfere.
290         mockRaftActor.waitForRecoveryComplete();
291
292         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
293         mockRaftActor.setRaftActorRecoverySupport(mockSupport);
294
295         Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}),
296                 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
297         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
298         mockRaftActor.handleRecover(snapshotOffer);
299
300         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
301         mockRaftActor.handleRecover(logEntry);
302
303         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
304         mockRaftActor.handleRecover(applyJournalEntries);
305
306         DeleteEntries deleteEntries = new DeleteEntries(1);
307         mockRaftActor.handleRecover(deleteEntries);
308
309         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
310         mockRaftActor.handleRecover(updateElectionTerm);
311
312         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
313         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
314         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
315         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
316         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
317     }
318
319     @Test
320     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
321         String persistenceId = factory.generateActorId("leader-");
322
323         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
324
325         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
326
327         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
328
329         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
330                 .config(config).snapshotMessageSupport(mockSupport).props());
331
332         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
333
334         // Wait for akka's recovery to complete so it doesn't interfere.
335         mockRaftActor.waitForRecoveryComplete();
336
337         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
338         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
339         mockRaftActor.handleCommand(applySnapshot);
340
341         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(),
342                 java.util.Optional.empty());
343         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
344         mockRaftActor.handleCommand(captureSnapshotReply);
345
346         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
347         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
348         mockRaftActor.handleCommand(saveSnapshotSuccess);
349
350         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L),
351                 new Throwable());
352         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
353         mockRaftActor.handleCommand(saveSnapshotFailure);
354
355         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
356                 any(ActorRef.class));
357         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
358
359         doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
360         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
361
362         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
363         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
364         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
365         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
366         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
367                 any(ActorRef.class));
368         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
369     }
370
371     @SuppressWarnings("unchecked")
372     @Test
373     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
374         String persistenceId = factory.generateActorId("leader-");
375
376         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
377
378         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
379
380         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
381
382         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
383                 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
384
385         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
386
387         mockRaftActor.waitForInitializeBehaviorComplete();
388
389         mockRaftActor.waitUntilLeader();
390
391         mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
392
393         verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
394     }
395
396     @Test
397     public void testApplyState() throws Exception {
398         String persistenceId = factory.generateActorId("leader-");
399
400         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
401
402         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
403
404         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
405
406         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
407                 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
408
409         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
410
411         mockRaftActor.waitForInitializeBehaviorComplete();
412
413         ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
414
415         final Identifier id = new MockIdentifier("apply-state");
416         mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
417
418         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
419     }
420
421     @Test
422     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
423         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
424                 Props.create(MessageCollectorActor.class));
425         MessageCollectorActor.waitUntilReady(notifierActor);
426
427         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
428         long heartBeatInterval = 100;
429         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
430         config.setElectionTimeoutFactor(20);
431
432         String persistenceId = factory.generateActorId("notifier-");
433
434         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
435                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
436                         new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
437                 persistenceId);
438
439         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
440
441
442         // check if the notifier got a role change from null to Follower
443         RoleChanged raftRoleChanged = matches.get(0);
444         assertEquals(persistenceId, raftRoleChanged.getMemberId());
445         assertNull(raftRoleChanged.getOldRole());
446         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
447
448         // check if the notifier got a role change from Follower to Candidate
449         raftRoleChanged = matches.get(1);
450         assertEquals(persistenceId, raftRoleChanged.getMemberId());
451         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
452         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
453
454         // check if the notifier got a role change from Candidate to Leader
455         raftRoleChanged = matches.get(2);
456         assertEquals(persistenceId, raftRoleChanged.getMemberId());
457         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
458         assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
459
460         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
461                 notifierActor, LeaderStateChanged.class);
462
463         assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
464         assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
465
466         notifierActor.underlyingActor().clear();
467
468         MockRaftActor raftActor = raftActorRef.underlyingActor();
469         final String newLeaderId = "new-leader";
470         final short newLeaderVersion = 6;
471         Follower follower = new Follower(raftActor.getRaftActorContext()) {
472             @Override
473             public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
474                 setLeaderId(newLeaderId);
475                 setLeaderPayloadVersion(newLeaderVersion);
476                 return this;
477             }
478         };
479
480         raftActor.newBehavior(follower);
481
482         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
483         assertEquals(persistenceId, leaderStateChange.getMemberId());
484         assertEquals(null, leaderStateChange.getLeaderId());
485
486         raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
487         assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
488         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
489
490         notifierActor.underlyingActor().clear();
491
492         raftActor.handleCommand("any");
493
494         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
495         assertEquals(persistenceId, leaderStateChange.getMemberId());
496         assertEquals(newLeaderId, leaderStateChange.getLeaderId());
497         assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
498
499         notifierActor.underlyingActor().clear();
500
501         raftActor.handleCommand("any");
502
503         Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
504         leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
505         assertNull(leaderStateChange);
506     }
507
508     @Test
509     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
510         ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
511         MessageCollectorActor.waitUntilReady(notifierActor);
512
513         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
514         long heartBeatInterval = 100;
515         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
516         config.setElectionTimeoutFactor(1);
517
518         String persistenceId = factory.generateActorId("notifier-");
519
520         factory.createActor(MockRaftActor.builder().id(persistenceId)
521                 .peerAddresses(ImmutableMap.of("leader", "fake/path"))
522                 .config(config).roleChangeNotifier(notifierActor).props());
523
524         List<RoleChanged> matches =  null;
525         for (int i = 0; i < 5000 / heartBeatInterval; i++) {
526             matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
527             assertNotNull(matches);
528             if (matches.size() == 3) {
529                 break;
530             }
531             Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
532         }
533
534         assertNotNull(matches);
535         assertEquals(2, matches.size());
536
537         // check if the notifier got a role change from null to Follower
538         RoleChanged raftRoleChanged = matches.get(0);
539         assertEquals(persistenceId, raftRoleChanged.getMemberId());
540         assertNull(raftRoleChanged.getOldRole());
541         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
542
543         // check if the notifier got a role change from Follower to Candidate
544         raftRoleChanged = matches.get(1);
545         assertEquals(persistenceId, raftRoleChanged.getMemberId());
546         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
547         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
548     }
549
550     @Test
551     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
552         final String persistenceId = factory.generateActorId("leader-");
553         final String follower1Id = factory.generateActorId("follower-");
554
555         ActorRef followerActor1 =
556                 factory.createActor(Props.create(MessageCollectorActor.class));
557
558         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
559         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
560         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
561
562         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
563
564         Map<String, String> peerAddresses = new HashMap<>();
565         peerAddresses.put(follower1Id, followerActor1.path().toString());
566
567         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
568                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
569
570         MockRaftActor leaderActor = mockActorRef.underlyingActor();
571
572         leaderActor.getRaftActorContext().setCommitIndex(4);
573         leaderActor.getRaftActorContext().setLastApplied(4);
574         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
575
576         leaderActor.waitForInitializeBehaviorComplete();
577
578         // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
579
580         Leader leader = new Leader(leaderActor.getRaftActorContext());
581         leaderActor.setCurrentBehavior(leader);
582         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
583
584         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
585         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
586
587         assertEquals(8, leaderActor.getReplicatedLog().size());
588
589         leaderActor.getRaftActorContext().getSnapshotManager().capture(
590                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4);
591
592         verify(leaderActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
593
594         assertEquals(8, leaderActor.getReplicatedLog().size());
595
596         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
597         //fake snapshot on index 5
598         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
599
600         assertEquals(8, leaderActor.getReplicatedLog().size());
601
602         //fake snapshot on index 6
603         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
604         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
605         assertEquals(8, leaderActor.getReplicatedLog().size());
606
607         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
608
609         assertEquals(8, leaderActor.getReplicatedLog().size());
610
611         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
612                 new MockRaftActorContext.MockPayload("foo-0"),
613                 new MockRaftActorContext.MockPayload("foo-1"),
614                 new MockRaftActorContext.MockPayload("foo-2"),
615                 new MockRaftActorContext.MockPayload("foo-3"),
616                 new MockRaftActorContext.MockPayload("foo-4")));
617
618         leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, java.util.Optional.empty(),
619                 Runtime.getRuntime().totalMemory());
620
621         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
622
623         // The commit is needed to complete the snapshot creation process
624         leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
625
626         // capture snapshot reply should remove the snapshotted entries only
627         assertEquals(3, leaderActor.getReplicatedLog().size());
628         assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
629
630         // add another non-replicated entry
631         leaderActor.getReplicatedLog().append(
632                 new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
633
634         //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
635         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
636         assertEquals(2, leaderActor.getReplicatedLog().size());
637         assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
638     }
639
640     @Test
641     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
642         final String persistenceId = factory.generateActorId("follower-");
643         final String leaderId = factory.generateActorId("leader-");
644
645
646         ActorRef leaderActor1 =
647                 factory.createActor(Props.create(MessageCollectorActor.class));
648
649         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
650         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
651         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
652
653         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
654
655         Map<String, String> peerAddresses = new HashMap<>();
656         peerAddresses.put(leaderId, leaderActor1.path().toString());
657
658         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
659                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
660
661         MockRaftActor followerActor = mockActorRef.underlyingActor();
662         followerActor.getRaftActorContext().setCommitIndex(4);
663         followerActor.getRaftActorContext().setLastApplied(4);
664         followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
665
666         followerActor.waitForInitializeBehaviorComplete();
667
668
669         Follower follower = new Follower(followerActor.getRaftActorContext());
670         followerActor.setCurrentBehavior(follower);
671         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
672
673         // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
674         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
675         followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
676
677         // log has indices 0-5
678         assertEquals(6, followerActor.getReplicatedLog().size());
679
680         //snapshot on 4
681         followerActor.getRaftActorContext().getSnapshotManager().capture(
682                 new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4);
683
684         verify(followerActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
685
686         assertEquals(6, followerActor.getReplicatedLog().size());
687
688         //fake snapshot on index 6
689         List<ReplicatedLogEntry> entries = Arrays.asList(
690                 (ReplicatedLogEntry) new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6")));
691         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
692         assertEquals(7, followerActor.getReplicatedLog().size());
693
694         //fake snapshot on index 7
695         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
696
697         entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(7, 1,
698                 new MockRaftActorContext.MockPayload("foo-7")));
699         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
700         assertEquals(8, followerActor.getReplicatedLog().size());
701
702         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
703
704
705         ByteString snapshotBytes = fromObject(Arrays.asList(
706                 new MockRaftActorContext.MockPayload("foo-0"),
707                 new MockRaftActorContext.MockPayload("foo-1"),
708                 new MockRaftActorContext.MockPayload("foo-2"),
709                 new MockRaftActorContext.MockPayload("foo-3"),
710                 new MockRaftActorContext.MockPayload("foo-4")));
711         followerActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
712                 java.util.Optional.empty()));
713         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
714
715         // The commit is needed to complete the snapshot creation process
716         followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
717
718         // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
719         assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
720         assertEquals(7, followerActor.getReplicatedLog().lastIndex());
721
722         entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(8, 1,
723                 new MockRaftActorContext.MockPayload("foo-7")));
724         // send an additional entry 8 with leaderCommit = 7
725         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
726
727         // 7 and 8, as lastapplied is 7
728         assertEquals(2, followerActor.getReplicatedLog().size());
729     }
730
731     @Test
732     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
733         final String persistenceId = factory.generateActorId("leader-");
734         final String follower1Id = factory.generateActorId("follower-");
735         final String follower2Id = factory.generateActorId("follower-");
736
737         final ActorRef followerActor1 = factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
738         final ActorRef followerActor2 = factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
739
740         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
741         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
742         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
743
744         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
745
746         Map<String, String> peerAddresses = new HashMap<>();
747         peerAddresses.put(follower1Id, followerActor1.path().toString());
748         peerAddresses.put(follower2Id, followerActor2.path().toString());
749
750         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
751                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
752
753         MockRaftActor leaderActor = mockActorRef.underlyingActor();
754         leaderActor.getRaftActorContext().setCommitIndex(9);
755         leaderActor.getRaftActorContext().setLastApplied(9);
756         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
757
758         leaderActor.waitForInitializeBehaviorComplete();
759
760         Leader leader = new Leader(leaderActor.getRaftActorContext());
761         leaderActor.setCurrentBehavior(leader);
762         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
763
764         // create 5 entries in the log
765         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
766         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
767
768         //set the snapshot index to 4 , 0 to 4 are snapshotted
769         leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
770         //setting replicatedToAllIndex = 9, for the log to clear
771         leader.setReplicatedToAllIndex(9);
772         assertEquals(5, leaderActor.getReplicatedLog().size());
773         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
774
775         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
776         assertEquals(5, leaderActor.getReplicatedLog().size());
777         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
778
779         // set the 2nd follower nextIndex to 1 which has been snapshotted
780         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
781         assertEquals(5, leaderActor.getReplicatedLog().size());
782         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
783
784         // simulate a real snapshot
785         leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
786         assertEquals(5, leaderActor.getReplicatedLog().size());
787         assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
788                 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
789                 RaftState.Leader, leaderActor.getCurrentBehavior().state());
790
791
792         //reply from a slow follower does not initiate a fake snapshot
793         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
794         assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
795                 leaderActor.getReplicatedLog().size());
796
797         ByteString snapshotBytes = fromObject(Arrays.asList(
798                 new MockRaftActorContext.MockPayload("foo-0"),
799                 new MockRaftActorContext.MockPayload("foo-1"),
800                 new MockRaftActorContext.MockPayload("foo-2"),
801                 new MockRaftActorContext.MockPayload("foo-3"),
802                 new MockRaftActorContext.MockPayload("foo-4")));
803         leaderActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
804                 java.util.Optional.empty()));
805         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
806
807         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
808                 leaderActor.getReplicatedLog().size());
809
810         //reply from a slow follower after should not raise errors
811         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
812         assertEquals(0, leaderActor.getReplicatedLog().size());
813     }
814
815     @Test
816     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
817         String persistenceId = factory.generateActorId("leader-");
818         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
819         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
820         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
821         config.setSnapshotBatchCount(5);
822
823         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
824
825         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
826
827         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
828                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
829
830         MockRaftActor leaderActor = mockActorRef.underlyingActor();
831         leaderActor.getRaftActorContext().setCommitIndex(3);
832         leaderActor.getRaftActorContext().setLastApplied(3);
833         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
834
835         leaderActor.waitForInitializeBehaviorComplete();
836         for (int i = 0; i < 4; i++) {
837             leaderActor.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
838                     new MockRaftActorContext.MockPayload("A")));
839         }
840
841         Leader leader = new Leader(leaderActor.getRaftActorContext());
842         leaderActor.setCurrentBehavior(leader);
843         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
844
845         // Simulate an install snaphost to a follower.
846         leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
847                 leaderActor.getReplicatedLog().last(), -1, "member1");
848
849         // Now send a CaptureSnapshotReply
850         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
851                 java.util.Optional.empty()), mockActorRef);
852
853         // Trimming log in this scenario is a no-op
854         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
855         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
856         assertEquals(-1, leader.getReplicatedToAllIndex());
857     }
858
859     @Test
860     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
861         String persistenceId = factory.generateActorId("leader-");
862         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
863         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
864         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
865         config.setSnapshotBatchCount(5);
866
867         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
868
869         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
870
871         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
872                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
873
874         MockRaftActor leaderActor = mockActorRef.underlyingActor();
875         leaderActor.getRaftActorContext().setCommitIndex(3);
876         leaderActor.getRaftActorContext().setLastApplied(3);
877         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
878         leaderActor.getReplicatedLog().setSnapshotIndex(3);
879
880         leaderActor.waitForInitializeBehaviorComplete();
881         Leader leader = new Leader(leaderActor.getRaftActorContext());
882         leaderActor.setCurrentBehavior(leader);
883         leader.setReplicatedToAllIndex(3);
884         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
885
886         // Persist another entry (this will cause a CaptureSnapshot to be triggered
887         leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
888                 new MockRaftActorContext.MockPayload("duh"), false);
889
890         // Now send a CaptureSnapshotReply
891         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
892                 java.util.Optional.empty()), mockActorRef);
893
894         // Trimming log in this scenario is a no-op
895         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
896         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
897         assertEquals(3, leader.getReplicatedToAllIndex());
898     }
899
900     @Test
901     public void testSwitchBehavior() {
902         String persistenceId = factory.generateActorId("leader-");
903         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
904         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
905         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
906         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
907         config.setSnapshotBatchCount(5);
908
909         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
910
911         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
912
913         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
914                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
915
916         MockRaftActor leaderActor = mockActorRef.underlyingActor();
917
918         leaderActor.waitForRecoveryComplete();
919
920         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
921
922         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
923         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
924
925         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
926
927         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
928         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
929
930         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
931
932         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
933         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
934
935         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
936
937         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
938         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
939     }
940
941     public static ByteString fromObject(Object snapshot) throws Exception {
942         ByteArrayOutputStream bos = null;
943         ObjectOutputStream os = null;
944         try {
945             bos = new ByteArrayOutputStream();
946             os = new ObjectOutputStream(bos);
947             os.writeObject(snapshot);
948             byte[] snapshotBytes = bos.toByteArray();
949             return ByteString.copyFrom(snapshotBytes);
950         } finally {
951             if (os != null) {
952                 os.flush();
953                 os.close();
954             }
955             if (bos != null) {
956                 bos.close();
957             }
958         }
959     }
960
961     @Test
962     public void testUpdateConfigParam() throws Exception {
963         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
964         String persistenceId = factory.generateActorId("follower-");
965         ImmutableMap<String, String> peerAddresses =
966             ImmutableMap.<String, String>builder().put("member1", "address").build();
967         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
968
969         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
970                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
971         MockRaftActor mockRaftActor = actorRef.underlyingActor();
972         mockRaftActor.waitForInitializeBehaviorComplete();
973
974         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
975         mockRaftActor.updateConfigParams(emptyConfig);
976         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
977         assertEquals("Behavior State", RaftState.Follower,
978             mockRaftActor.getCurrentBehavior().state());
979
980         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
981         disableConfig.setCustomRaftPolicyImplementationClass(
982             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
983         mockRaftActor.updateConfigParams(disableConfig);
984         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
985         assertEquals("Behavior State", RaftState.Follower,
986             mockRaftActor.getCurrentBehavior().state());
987
988         behavior = mockRaftActor.getCurrentBehavior();
989         mockRaftActor.updateConfigParams(disableConfig);
990         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
991         assertEquals("Behavior State", RaftState.Follower,
992             mockRaftActor.getCurrentBehavior().state());
993
994         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
995         defaultConfig.setCustomRaftPolicyImplementationClass(
996             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
997         mockRaftActor.updateConfigParams(defaultConfig);
998         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
999         assertEquals("Behavior State", RaftState.Follower,
1000             mockRaftActor.getCurrentBehavior().state());
1001
1002         behavior = mockRaftActor.getCurrentBehavior();
1003         mockRaftActor.updateConfigParams(defaultConfig);
1004         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1005         assertEquals("Behavior State", RaftState.Follower,
1006             mockRaftActor.getCurrentBehavior().state());
1007     }
1008
1009     @Test
1010     public void testGetSnapshot() throws Exception {
1011         TEST_LOG.info("testGetSnapshot starting");
1012
1013         final JavaTestKit kit = new JavaTestKit(getSystem());
1014
1015         String persistenceId = factory.generateActorId("test-actor-");
1016         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1017         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1018
1019         long term = 3;
1020         long seqN = 1;
1021         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1022         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(0, term,
1023                 new MockRaftActorContext.MockPayload("A")));
1024         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(1, term,
1025                 new MockRaftActorContext.MockPayload("B")));
1026         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1027         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(2, term,
1028                 new MockRaftActorContext.MockPayload("C")));
1029
1030         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1031                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config)
1032                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1033         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1034
1035         mockRaftActor.waitForRecoveryComplete();
1036
1037         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1038
1039         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1040
1041         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1042         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
1043                 eq(java.util.Optional.empty()));
1044
1045         byte[] stateSnapshot = new byte[]{1,2,3};
1046         replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), java.util.Optional.empty()),
1047                 ActorRef.noSender());
1048
1049         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1050
1051         assertEquals("getId", persistenceId, reply.getId());
1052         Snapshot replySnapshot = reply.getSnapshot();
1053         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1054         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1055         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1056         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1057         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1058         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1059         assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState());
1060         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1061         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1062
1063         // Test with timeout
1064
1065         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1066                 Duration.create(200, TimeUnit.MILLISECONDS));
1067         reset(mockRaftActor.snapshotCohortDelegate);
1068
1069         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1070         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1071         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1072
1073         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1074
1075         // Test with persistence disabled.
1076
1077         mockRaftActor.setPersistence(false);
1078         reset(mockRaftActor.snapshotCohortDelegate);
1079
1080         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1081         reply = kit.expectMsgClass(GetSnapshotReply.class);
1082         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
1083
1084         assertEquals("getId", persistenceId, reply.getId());
1085         replySnapshot = reply.getSnapshot();
1086         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1087         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1088         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1089         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1090         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1091         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1092         assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState());
1093         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1094
1095         TEST_LOG.info("testGetSnapshot ending");
1096     }
1097
1098     @Test
1099     public void testRestoreFromSnapshot() throws Exception {
1100         TEST_LOG.info("testRestoreFromSnapshot starting");
1101
1102         String persistenceId = factory.generateActorId("test-actor-");
1103         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1104         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1105
1106         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
1107         snapshotUnappliedEntries.add(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
1108
1109         int snapshotLastApplied = 3;
1110         int snapshotLastIndex = 4;
1111
1112         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
1113                 new MockRaftActorContext.MockPayload("A"),
1114                 new MockRaftActorContext.MockPayload("B"),
1115                 new MockRaftActorContext.MockPayload("C"),
1116                 new MockRaftActorContext.MockPayload("D")));
1117
1118         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries,
1119                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null);
1120
1121         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1122
1123         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1124                 .config(config).restoreFromSnapshot(snapshot).props()
1125                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1126         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1127
1128         mockRaftActor.waitForRecoveryComplete();
1129
1130         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1131         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1132         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1133         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1134         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1135         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1136         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1137         assertEquals("getState", snapshot.getState(), savedSnapshot.getState());
1138         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1139
1140         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class));
1141
1142         RaftActorContext context = mockRaftActor.getRaftActorContext();
1143         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1144         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1145         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1146         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1147         assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState());
1148         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1149         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1150
1151         // Test with data persistence disabled
1152
1153         snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
1154                 -1, -1, -1, -1, 5, "member-1", null);
1155
1156         persistenceId = factory.generateActorId("test-actor-");
1157
1158         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1159                 .config(config).restoreFromSnapshot(snapshot)
1160                 .persistent(Optional.of(Boolean.FALSE)).props()
1161                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1162         mockRaftActor = raftActorRef.underlyingActor();
1163
1164         mockRaftActor.waitForRecoveryComplete();
1165         assertEquals("snapshot committed", true,
1166                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1167
1168         context = mockRaftActor.getRaftActorContext();
1169         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1170         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1171
1172         TEST_LOG.info("testRestoreFromSnapshot ending");
1173     }
1174
1175     @Test
1176     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1177         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1178
1179         String persistenceId = factory.generateActorId("test-actor-");
1180         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1181         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1182
1183         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
1184         Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
1185                 Arrays.<ReplicatedLogEntry>asList(), 5, 2, 5, 2, 2, "member-1", null);
1186
1187         InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
1188                 new MockRaftActorContext.MockPayload("B")));
1189
1190         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1191                 .config(config).restoreFromSnapshot(snapshot).props()
1192                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1193         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1194
1195         mockRaftActor.waitForRecoveryComplete();
1196
1197         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1198         verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
1199
1200         RaftActorContext context = mockRaftActor.getRaftActorContext();
1201         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1202         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1203         assertEquals("Last applied", -1, context.getLastApplied());
1204         assertEquals("Commit index", -1, context.getCommitIndex());
1205         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1206         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1207
1208         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1209     }
1210
1211     @Test
1212     public void testNonVotingOnRecovery() throws Exception {
1213         TEST_LOG.info("testNonVotingOnRecovery starting");
1214
1215         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1216         config.setElectionTimeoutFactor(1);
1217         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1218
1219         String persistenceId = factory.generateActorId("test-actor-");
1220         InMemoryJournal.addEntry(persistenceId, 1,  new SimpleReplicatedLogEntry(0, 1,
1221                 new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
1222
1223         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1224                 .config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1225         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1226
1227         mockRaftActor.waitForInitializeBehaviorComplete();
1228
1229         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1230
1231         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1232         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1233
1234         TEST_LOG.info("testNonVotingOnRecovery ending");
1235     }
1236
1237     @Test
1238     public void testLeaderTransitioning() throws Exception {
1239         TEST_LOG.info("testLeaderTransitioning starting");
1240
1241         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
1242                 Props.create(MessageCollectorActor.class));
1243
1244         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1245         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1246
1247         String persistenceId = factory.generateActorId("test-actor-");
1248
1249         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1250                 .config(config).roleChangeNotifier(notifierActor).props()
1251                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1252         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1253
1254         mockRaftActor.waitForInitializeBehaviorComplete();
1255
1256         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
1257                 0L, -1L, (short)1), ActorRef.noSender());
1258         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1259                 notifierActor, LeaderStateChanged.class);
1260         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1261
1262         MessageCollectorActor.clearMessages(notifierActor);
1263
1264         raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
1265
1266         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1267         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1268         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1269
1270         TEST_LOG.info("testLeaderTransitioning ending");
1271     }
1272
1273     @SuppressWarnings({ "unchecked", "rawtypes" })
1274     @Test
1275     public void testReplicateWithPersistencePending() throws Exception {
1276         final String leaderId = factory.generateActorId("leader-");
1277         final String followerId = factory.generateActorId("follower-");
1278
1279         final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
1280
1281         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1282         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1283         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1284
1285         DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
1286         doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
1287
1288         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1289                 MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
1290                         mockPersistenceProvider), leaderId);
1291         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1292         leaderActor.waitForInitializeBehaviorComplete();
1293
1294         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1295
1296         Leader leader = new Leader(leaderActor.getRaftActorContext());
1297         leaderActor.setCurrentBehavior(leader);
1298
1299         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
1300                 false);
1301
1302         ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
1303         assertNotNull("ReplicatedLogEntry not found", logEntry);
1304         assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
1305         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1306
1307         leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
1308         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1309
1310         ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
1311         verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
1312
1313         callbackCaptor.getValue().apply(logEntry);
1314
1315         assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
1316         assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
1317     }
1318
1319     @Test
1320     public void testReplicateWithBatchHint() throws Exception {
1321         final String leaderId = factory.generateActorId("leader-");
1322         final String followerId = factory.generateActorId("follower-");
1323
1324         final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
1325
1326         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1327         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1328         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1329
1330         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1331                 MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config),
1332                     leaderId);
1333         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1334         leaderActor.waitForInitializeBehaviorComplete();
1335
1336         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1337
1338         Leader leader = new Leader(leaderActor.getRaftActorContext());
1339         leaderActor.setCurrentBehavior(leader);
1340
1341         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1342         MessageCollectorActor.clearMessages(followerActor);
1343
1344         leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
1345
1346         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
1347         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1348
1349         leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
1350         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1351
1352         leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
1353         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1354         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
1355     }
1356 }