Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNotSame;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.ArgumentMatchers.eq;
19 import static org.mockito.ArgumentMatchers.same;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.when;
27
28 import akka.actor.ActorRef;
29 import akka.actor.PoisonPill;
30 import akka.actor.Status.Failure;
31 import akka.actor.Terminated;
32 import akka.dispatch.Dispatchers;
33 import akka.japi.Procedure;
34 import akka.persistence.SaveSnapshotFailure;
35 import akka.persistence.SaveSnapshotSuccess;
36 import akka.persistence.SnapshotMetadata;
37 import akka.persistence.SnapshotOffer;
38 import akka.protobuf.ByteString;
39 import akka.testkit.TestActorRef;
40 import akka.testkit.javadsl.TestKit;
41 import com.google.common.util.concurrent.Uninterruptibles;
42 import java.io.ByteArrayOutputStream;
43 import java.io.ObjectOutputStream;
44 import java.time.Duration;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.Optional;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.mockito.ArgumentCaptor;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
58 import org.opendaylight.controller.cluster.PersistentDataProvider;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RoleChanged;
61 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestPersist;
62 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor;
63 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
64 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
67 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
69 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
70 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
71 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
72 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
73 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
74 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
75 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
76 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
77 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
78 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
79 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
80 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
81 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
82 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
83 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
84 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
85 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
86 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
87 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
88 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
89 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.opendaylight.yangtools.concepts.Identifier;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
94 import scala.concurrent.duration.FiniteDuration;
95
96 public class RaftActorTest extends AbstractActorTest {
97
98     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
99
100     private TestActorFactory factory;
101
102     @Before
103     public void setUp() {
104         factory = new TestActorFactory(getSystem());
105     }
106
107     @After
108     public void tearDown() {
109         factory.close();
110         InMemoryJournal.clear();
111         InMemorySnapshotStore.clear();
112     }
113
114     @Test
115     public void testConstruction() {
116         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
117     }
118
119     @Test
120     public void testFindLeaderWhenLeaderIsSelf() {
121         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
122         kit.waitUntilLeader();
123     }
124
125
126     @Test
127     public void testRaftActorRecoveryWithPersistenceEnabled() {
128         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
129
130         TestKit kit = new TestKit(getSystem());
131         String persistenceId = factory.generateActorId("follower-");
132
133         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
134
135         // Set the heartbeat interval high to essentially disable election otherwise the test
136         // may fail if the actor is switched to Leader and the commitIndex is set to the last
137         // log entry.
138         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
139
140         Map<String, String> peerAddresses = Map.of("member1", "address");
141         ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
142                 peerAddresses, config), persistenceId);
143
144         kit.watch(followerActor);
145
146         List<ReplicatedLogEntry> snapshotUnappliedEntries = List.of(
147             new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
148
149         int lastAppliedDuringSnapshotCapture = 3;
150         int lastIndexDuringSnapshotCapture = 4;
151
152         // 4 messages as part of snapshot, which are applied to state
153         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
154                 new MockRaftActorContext.MockPayload("A"),
155                 new MockRaftActorContext.MockPayload("B"),
156                 new MockRaftActorContext.MockPayload("C"),
157                 new MockRaftActorContext.MockPayload("D")));
158
159         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
160                 lastAppliedDuringSnapshotCapture, 1, -1, null, null);
161         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
162
163         // add more entries after snapshot is taken
164         ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F", 2));
165         ReplicatedLogEntry entry3 = new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("G", 3));
166         ReplicatedLogEntry entry4 = new SimpleReplicatedLogEntry(7, 1, new MockRaftActorContext.MockPayload("H", 4));
167
168         final int lastAppliedToState = 5;
169         final int lastIndex = 7;
170
171         InMemoryJournal.addEntry(persistenceId, 5, entry2);
172         // 2 entries are applied to state besides the 4 entries in snapshot
173         InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
174         InMemoryJournal.addEntry(persistenceId, 7, entry3);
175         InMemoryJournal.addEntry(persistenceId, 8, entry4);
176
177         // kill the actor
178         followerActor.tell(PoisonPill.getInstance(), null);
179         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
180
181         kit.unwatch(followerActor);
182
183         //reinstate the actor
184         TestActorRef<MockRaftActor> ref = factory.createTestActor(
185                 MockRaftActor.props(persistenceId, peerAddresses, config));
186
187         MockRaftActor mockRaftActor = ref.underlyingActor();
188
189         mockRaftActor.waitForRecoveryComplete();
190
191         RaftActorContext context = mockRaftActor.getRaftActorContext();
192         assertEquals("Journal log size", snapshotUnappliedEntries.size() + 3,
193                 context.getReplicatedLog().size());
194         assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
195         assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
196         assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
197         assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
198         assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
199
200         mockRaftActor.waitForInitializeBehaviorComplete();
201
202         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
203
204         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
205     }
206
207     @Test
208     public void testRaftActorRecoveryWithPersistenceDisabled() {
209         String persistenceId = factory.generateActorId("follower-");
210
211         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
212
213         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
214
215         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
216                 Map.of("member1", "address"), config, createProvider()), persistenceId);
217
218         MockRaftActor mockRaftActor = ref.underlyingActor();
219
220         mockRaftActor.waitForRecoveryComplete();
221
222         mockRaftActor.waitForInitializeBehaviorComplete();
223
224         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
225     }
226
227     @Test
228     public void testUpdateElectionTermPersistedWithPersistenceDisabled() {
229         final TestKit kit = new TestKit(getSystem());
230         String persistenceId = factory.generateActorId("follower-");
231         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
232         config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
233         config.setElectionTimeoutFactor(1);
234
235         InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
236
237         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
238                 Map.of("member1", "address"), config, createProvider())
239                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
240
241         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
242         List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
243         assertEquals("UpdateElectionTerm entries", 1, entries.size());
244         final UpdateElectionTerm updateEntry = entries.get(0);
245
246         factory.killActor(ref, kit);
247
248         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
249         ref = factory.createTestActor(MockRaftActor.props(persistenceId, Map.of("member1", "address"), config,
250                 createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
251                 factory.generateActorId("follower-"));
252
253         MockRaftActor actor = ref.underlyingActor();
254         actor.waitForRecoveryComplete();
255
256         RaftActorContext newContext = actor.getRaftActorContext();
257         assertEquals("electionTerm", updateEntry.getCurrentTerm(),
258                 newContext.getTermInformation().getCurrentTerm());
259         assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
260
261         entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
262         assertEquals("UpdateElectionTerm entries", 1, entries.size());
263     }
264
265     @Test
266     public void testRaftActorForwardsToRaftActorRecoverySupport() {
267         String persistenceId = factory.generateActorId("leader-");
268
269         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
270
271         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
272
273         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
274                 Map.of(), config), persistenceId);
275
276         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
277
278         // Wait for akka's recovery to complete so it doesn't interfere.
279         mockRaftActor.waitForRecoveryComplete();
280
281         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
282         mockRaftActor.setRaftActorRecoverySupport(mockSupport);
283
284         Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}),
285                 List.of(), 3, 1, 3, 1, -1, null, null);
286         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
287         mockRaftActor.handleRecover(snapshotOffer);
288
289         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
290         mockRaftActor.handleRecover(logEntry);
291
292         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
293         mockRaftActor.handleRecover(applyJournalEntries);
294
295         DeleteEntries deleteEntries = new DeleteEntries(1);
296         mockRaftActor.handleRecover(deleteEntries);
297
298         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
299         mockRaftActor.handleRecover(updateElectionTerm);
300
301         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
302         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
303         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
304         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
305         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
306     }
307
308     @Test
309     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
310         String persistenceId = factory.generateActorId("leader-");
311
312         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
313
314         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
315
316         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
317
318         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
319                 .config(config).snapshotMessageSupport(mockSupport).props());
320
321         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
322
323         // Wait for akka's recovery to complete so it doesn't interfere.
324         mockRaftActor.waitForRecoveryComplete();
325
326         ApplySnapshot applySnapshot = new ApplySnapshot(
327             Snapshot.create(null, null, 0, 0, 0, 0, 0, persistenceId, null));
328         when(mockSupport.handleSnapshotMessage(same(applySnapshot), any(ActorRef.class))).thenReturn(true);
329         mockRaftActor.handleCommand(applySnapshot);
330
331         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(), Optional.empty());
332         when(mockSupport.handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class))).thenReturn(true);
333         mockRaftActor.handleCommand(captureSnapshotReply);
334
335         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
336         when(mockSupport.handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class))).thenReturn(true);
337         mockRaftActor.handleCommand(saveSnapshotSuccess);
338
339         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L),
340                 new Throwable());
341         when(mockSupport.handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class))).thenReturn(true);
342         mockRaftActor.handleCommand(saveSnapshotFailure);
343
344         when(mockSupport.handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
345             any(ActorRef.class))).thenReturn(true);
346         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
347
348         when(mockSupport.handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class))).thenReturn(true);
349         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
350
351         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
352         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
353         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
354         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
355         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
356                 any(ActorRef.class));
357         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
358     }
359
360     @SuppressWarnings("unchecked")
361     @Test
362     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
363         String persistenceId = factory.generateActorId("leader-");
364
365         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
366
367         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
368
369         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
370
371         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
372                 Map.of(), config, dataPersistenceProvider), persistenceId);
373
374         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
375
376         mockRaftActor.waitForInitializeBehaviorComplete();
377
378         mockRaftActor.waitUntilLeader();
379
380         mockRaftActor.handleCommand(new ApplyJournalEntries(10));
381
382         verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
383     }
384
385     @Test
386     public void testApplyState() {
387         String persistenceId = factory.generateActorId("leader-");
388
389         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
390
391         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
392
393         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
394
395         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
396                 Map.of(), config, dataPersistenceProvider), persistenceId);
397
398         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
399
400         mockRaftActor.waitForInitializeBehaviorComplete();
401
402         ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
403
404         final Identifier id = new MockIdentifier("apply-state");
405         mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
406
407         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), any());
408     }
409
410     @Test
411     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
412         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
413         MessageCollectorActor.waitUntilReady(notifierActor);
414
415         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
416         long heartBeatInterval = 100;
417         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
418         config.setElectionTimeoutFactor(20);
419
420         String persistenceId = factory.generateActorId("notifier-");
421
422         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
423                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
424                     createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
425                 persistenceId);
426
427         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
428
429
430         // check if the notifier got a role change from null to Follower
431         RoleChanged raftRoleChanged = matches.get(0);
432         assertEquals(persistenceId, raftRoleChanged.getMemberId());
433         assertNull(raftRoleChanged.getOldRole());
434         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
435
436         // check if the notifier got a role change from Follower to Candidate
437         raftRoleChanged = matches.get(1);
438         assertEquals(persistenceId, raftRoleChanged.getMemberId());
439         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
440         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
441
442         // check if the notifier got a role change from Candidate to Leader
443         raftRoleChanged = matches.get(2);
444         assertEquals(persistenceId, raftRoleChanged.getMemberId());
445         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
446         assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
447
448         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
449                 notifierActor, LeaderStateChanged.class);
450
451         assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
452         assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
453
454         MessageCollectorActor.clearMessages(notifierActor);
455
456         MockRaftActor raftActor = raftActorRef.underlyingActor();
457         final String newLeaderId = "new-leader";
458         final short newLeaderVersion = 6;
459         Follower follower = new Follower(raftActor.getRaftActorContext()) {
460             @Override
461             public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
462                 setLeaderId(newLeaderId);
463                 setLeaderPayloadVersion(newLeaderVersion);
464                 return this;
465             }
466         };
467
468         raftActor.newBehavior(follower);
469
470         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
471         assertEquals(persistenceId, leaderStateChange.getMemberId());
472         assertEquals(null, leaderStateChange.getLeaderId());
473
474         raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
475         assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
476         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
477
478         MessageCollectorActor.clearMessages(notifierActor);
479
480         raftActor.handleCommand("any");
481
482         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
483         assertEquals(persistenceId, leaderStateChange.getMemberId());
484         assertEquals(newLeaderId, leaderStateChange.getLeaderId());
485         assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
486
487         MessageCollectorActor.clearMessages(notifierActor);
488
489         raftActor.handleCommand("any");
490
491         Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
492         leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
493         assertNull(leaderStateChange);
494     }
495
496     @Test
497     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
498         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
499         MessageCollectorActor.waitUntilReady(notifierActor);
500
501         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
502         long heartBeatInterval = 100;
503         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
504         config.setElectionTimeoutFactor(1);
505
506         String persistenceId = factory.generateActorId("notifier-");
507
508         factory.createActor(MockRaftActor.builder().id(persistenceId)
509                 .peerAddresses(Map.of("leader", "fake/path"))
510                 .config(config).roleChangeNotifier(notifierActor).props());
511
512         List<RoleChanged> matches =  null;
513         for (int i = 0; i < 5000 / heartBeatInterval; i++) {
514             matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
515             assertNotNull(matches);
516             if (matches.size() == 3) {
517                 break;
518             }
519             Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
520         }
521
522         assertNotNull(matches);
523         assertEquals(2, matches.size());
524
525         // check if the notifier got a role change from null to Follower
526         RoleChanged raftRoleChanged = matches.get(0);
527         assertEquals(persistenceId, raftRoleChanged.getMemberId());
528         assertNull(raftRoleChanged.getOldRole());
529         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
530
531         // check if the notifier got a role change from Follower to Candidate
532         raftRoleChanged = matches.get(1);
533         assertEquals(persistenceId, raftRoleChanged.getMemberId());
534         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
535         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
536     }
537
538     @Test
539     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
540         final String persistenceId = factory.generateActorId("leader-");
541         final String follower1Id = factory.generateActorId("follower-");
542
543         ActorRef followerActor1 = factory.createActor(MessageCollectorActor.props());
544
545         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
546         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
547         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
548
549         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
550
551         Map<String, String> peerAddresses = Map.of(follower1Id, followerActor1.path().toString());
552
553         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
554                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
555
556         MockRaftActor leaderActor = mockActorRef.underlyingActor();
557
558         leaderActor.getRaftActorContext().setCommitIndex(4);
559         leaderActor.getRaftActorContext().setLastApplied(4);
560         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
561
562         leaderActor.waitForInitializeBehaviorComplete();
563
564         // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
565
566         Leader leader = new Leader(leaderActor.getRaftActorContext());
567         leaderActor.setCurrentBehavior(leader);
568         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
569
570         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
571         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
572
573         assertEquals(8, leaderActor.getReplicatedLog().size());
574
575         leaderActor.getRaftActorContext().getSnapshotManager().capture(
576                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4);
577
578         verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(), any());
579
580         assertEquals(8, leaderActor.getReplicatedLog().size());
581
582         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
583         //fake snapshot on index 5
584         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
585
586         assertEquals(8, leaderActor.getReplicatedLog().size());
587
588         //fake snapshot on index 6
589         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
590         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
591         assertEquals(8, leaderActor.getReplicatedLog().size());
592
593         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
594
595         assertEquals(8, leaderActor.getReplicatedLog().size());
596
597         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
598                 new MockRaftActorContext.MockPayload("foo-0"),
599                 new MockRaftActorContext.MockPayload("foo-1"),
600                 new MockRaftActorContext.MockPayload("foo-2"),
601                 new MockRaftActorContext.MockPayload("foo-3"),
602                 new MockRaftActorContext.MockPayload("foo-4")));
603
604         leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, Optional.empty(),
605                 Runtime.getRuntime().totalMemory());
606
607         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
608
609         // The commit is needed to complete the snapshot creation process
610         leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
611
612         // capture snapshot reply should remove the snapshotted entries only
613         assertEquals(3, leaderActor.getReplicatedLog().size());
614         assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
615
616         // add another non-replicated entry
617         leaderActor.getReplicatedLog().append(
618                 new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
619
620         //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
621         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
622         assertEquals(2, leaderActor.getReplicatedLog().size());
623         assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
624     }
625
626     @Test
627     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
628         final String persistenceId = factory.generateActorId("follower-");
629         final String leaderId = factory.generateActorId("leader-");
630
631         ActorRef leaderActor1 = factory.createActor(MessageCollectorActor.props());
632
633         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
634         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
635         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
636
637         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
638
639         Map<String, String> peerAddresses = Map.of(leaderId, leaderActor1.path().toString());
640
641         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
642                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
643
644         MockRaftActor followerActor = mockActorRef.underlyingActor();
645         followerActor.getRaftActorContext().setCommitIndex(4);
646         followerActor.getRaftActorContext().setLastApplied(4);
647         followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
648
649         followerActor.waitForInitializeBehaviorComplete();
650
651
652         Follower follower = new Follower(followerActor.getRaftActorContext());
653         followerActor.setCurrentBehavior(follower);
654         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
655
656         // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
657         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
658         followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
659
660         // log has indices 0-5
661         assertEquals(6, followerActor.getReplicatedLog().size());
662
663         //snapshot on 4
664         followerActor.getRaftActorContext().getSnapshotManager().capture(
665                 new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4);
666
667         verify(followerActor.snapshotCohortDelegate).createSnapshot(any(), any());
668
669         assertEquals(6, followerActor.getReplicatedLog().size());
670
671         //fake snapshot on index 6
672         List<ReplicatedLogEntry> entries = List.of(
673                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6")));
674         followerActor.handleCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
675         assertEquals(7, followerActor.getReplicatedLog().size());
676
677         //fake snapshot on index 7
678         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
679
680         entries = List.of(new SimpleReplicatedLogEntry(7, 1,
681                 new MockRaftActorContext.MockPayload("foo-7")));
682         followerActor.handleCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
683         assertEquals(8, followerActor.getReplicatedLog().size());
684
685         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
686
687
688         ByteString snapshotBytes = fromObject(List.of(
689                 new MockRaftActorContext.MockPayload("foo-0"),
690                 new MockRaftActorContext.MockPayload("foo-1"),
691                 new MockRaftActorContext.MockPayload("foo-2"),
692                 new MockRaftActorContext.MockPayload("foo-3"),
693                 new MockRaftActorContext.MockPayload("foo-4")));
694         followerActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
695                 Optional.empty()));
696         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
697
698         // The commit is needed to complete the snapshot creation process
699         followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
700
701         // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
702         assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
703         assertEquals(7, followerActor.getReplicatedLog().lastIndex());
704
705         entries = List.of(new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-7")));
706         // send an additional entry 8 with leaderCommit = 7
707         followerActor.handleCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
708
709         // 7 and 8, as lastapplied is 7
710         assertEquals(2, followerActor.getReplicatedLog().size());
711     }
712
713     @Test
714     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
715         final String persistenceId = factory.generateActorId("leader-");
716         final String follower1Id = factory.generateActorId("follower-");
717         final String follower2Id = factory.generateActorId("follower-");
718
719         final ActorRef followerActor1 = factory.createActor(MessageCollectorActor.props(), follower1Id);
720         final ActorRef followerActor2 = factory.createActor(MessageCollectorActor.props(), follower2Id);
721
722         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
723         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
724         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
725
726         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
727
728         Map<String, String> peerAddresses = Map.of(
729             follower1Id, followerActor1.path().toString(),
730             follower2Id, followerActor2.path().toString());
731
732         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
733                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
734
735         MockRaftActor leaderActor = mockActorRef.underlyingActor();
736         leaderActor.getRaftActorContext().setCommitIndex(9);
737         leaderActor.getRaftActorContext().setLastApplied(9);
738         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
739
740         leaderActor.waitForInitializeBehaviorComplete();
741
742         Leader leader = new Leader(leaderActor.getRaftActorContext());
743         leaderActor.setCurrentBehavior(leader);
744         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
745
746         // create 5 entries in the log
747         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
748         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
749
750         //set the snapshot index to 4 , 0 to 4 are snapshotted
751         leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
752         //setting replicatedToAllIndex = 9, for the log to clear
753         leader.setReplicatedToAllIndex(9);
754         assertEquals(5, leaderActor.getReplicatedLog().size());
755         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
756
757         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
758         assertEquals(5, leaderActor.getReplicatedLog().size());
759         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
760
761         // set the 2nd follower nextIndex to 1 which has been snapshotted
762         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
763         assertEquals(5, leaderActor.getReplicatedLog().size());
764         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
765
766         // simulate a real snapshot
767         leaderActor.handleCommand(SendHeartBeat.INSTANCE);
768         assertEquals(5, leaderActor.getReplicatedLog().size());
769         assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
770                 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
771                 RaftState.Leader, leaderActor.getCurrentBehavior().state());
772
773
774         //reply from a slow follower does not initiate a fake snapshot
775         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
776         assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
777                 leaderActor.getReplicatedLog().size());
778
779         ByteString snapshotBytes = fromObject(List.of(
780                 new MockRaftActorContext.MockPayload("foo-0"),
781                 new MockRaftActorContext.MockPayload("foo-1"),
782                 new MockRaftActorContext.MockPayload("foo-2"),
783                 new MockRaftActorContext.MockPayload("foo-3"),
784                 new MockRaftActorContext.MockPayload("foo-4")));
785         leaderActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
786                 Optional.empty()));
787         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
788
789         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
790                 leaderActor.getReplicatedLog().size());
791
792         //reply from a slow follower after should not raise errors
793         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
794         assertEquals(0, leaderActor.getReplicatedLog().size());
795     }
796
797     @Test
798     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
799         String persistenceId = factory.generateActorId("leader-");
800         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
801         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
802         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
803         config.setSnapshotBatchCount(5);
804
805         DataPersistenceProvider dataPersistenceProvider = createProvider();
806
807         Map<String, String> peerAddresses = Map.of("member1", "address");
808
809         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
810                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
811
812         MockRaftActor leaderActor = mockActorRef.underlyingActor();
813         leaderActor.getRaftActorContext().setCommitIndex(3);
814         leaderActor.getRaftActorContext().setLastApplied(3);
815         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
816
817         leaderActor.waitForInitializeBehaviorComplete();
818         for (int i = 0; i < 4; i++) {
819             leaderActor.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
820                     new MockRaftActorContext.MockPayload("A")));
821         }
822
823         Leader leader = new Leader(leaderActor.getRaftActorContext());
824         leaderActor.setCurrentBehavior(leader);
825         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
826
827         // Simulate an install snaphost to a follower.
828         leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
829                 leaderActor.getReplicatedLog().last(), -1, "member1");
830
831         // Now send a CaptureSnapshotReply
832         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
833                 Optional.empty()), mockActorRef);
834
835         // Trimming log in this scenario is a no-op
836         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
837         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
838         assertEquals(-1, leader.getReplicatedToAllIndex());
839     }
840
841     @Test
842     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
843         String persistenceId = factory.generateActorId("leader-");
844         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
845         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
846         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
847         config.setSnapshotBatchCount(5);
848
849         DataPersistenceProvider dataPersistenceProvider = createProvider();
850
851         Map<String, String> peerAddresses = Map.of("member1", "address");
852
853         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
854                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
855
856         MockRaftActor leaderActor = mockActorRef.underlyingActor();
857         leaderActor.getRaftActorContext().setCommitIndex(3);
858         leaderActor.getRaftActorContext().setLastApplied(3);
859         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
860         leaderActor.getReplicatedLog().setSnapshotIndex(3);
861
862         leaderActor.waitForInitializeBehaviorComplete();
863         Leader leader = new Leader(leaderActor.getRaftActorContext());
864         leaderActor.setCurrentBehavior(leader);
865         leader.setReplicatedToAllIndex(3);
866         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
867
868         // Persist another entry (this will cause a CaptureSnapshot to be triggered
869         leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
870                 new MockRaftActorContext.MockPayload("duh"), false);
871
872         // Now send a CaptureSnapshotReply
873         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
874                 Optional.empty()), mockActorRef);
875
876         // Trimming log in this scenario is a no-op
877         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
878         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
879         assertEquals(3, leader.getReplicatedToAllIndex());
880     }
881
882     private static DataPersistenceProvider createProvider() {
883         return new NonPersistentDataProvider(Runnable::run);
884     }
885
886     @Test
887     public void testSwitchBehavior() {
888         String persistenceId = factory.generateActorId("leader-");
889         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
890         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
891         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
892         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
893         config.setSnapshotBatchCount(5);
894
895         DataPersistenceProvider dataPersistenceProvider = createProvider();
896
897         Map<String, String> peerAddresses = Map.of();
898
899         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
900                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
901
902         MockRaftActor leaderActor = mockActorRef.underlyingActor();
903
904         leaderActor.waitForRecoveryComplete();
905
906         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
907
908         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
909         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
910
911         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
912
913         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
914         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
915
916         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
917
918         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
919         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
920
921         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
922
923         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
924         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
925     }
926
927     public static ByteString fromObject(final Object snapshot) throws Exception {
928         ByteArrayOutputStream bos = null;
929         ObjectOutputStream os = null;
930         try {
931             bos = new ByteArrayOutputStream();
932             os = new ObjectOutputStream(bos);
933             os.writeObject(snapshot);
934             byte[] snapshotBytes = bos.toByteArray();
935             return ByteString.copyFrom(snapshotBytes);
936         } finally {
937             if (os != null) {
938                 os.flush();
939                 os.close();
940             }
941             if (bos != null) {
942                 bos.close();
943             }
944         }
945     }
946
947     @Test
948     public void testUpdateConfigParam() {
949         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
950         String persistenceId = factory.generateActorId("follower-");
951         Map<String, String> peerAddresses = Map.of("member1", "address");
952         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
953
954         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
955                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
956         MockRaftActor mockRaftActor = actorRef.underlyingActor();
957         mockRaftActor.waitForInitializeBehaviorComplete();
958
959         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
960         mockRaftActor.updateConfigParams(emptyConfig);
961         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
962         assertEquals("Behavior State", RaftState.Follower,
963             mockRaftActor.getCurrentBehavior().state());
964
965         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
966         disableConfig.setCustomRaftPolicyImplementationClass(
967             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
968         mockRaftActor.updateConfigParams(disableConfig);
969         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
970         assertEquals("Behavior State", RaftState.Follower,
971             mockRaftActor.getCurrentBehavior().state());
972
973         behavior = mockRaftActor.getCurrentBehavior();
974         mockRaftActor.updateConfigParams(disableConfig);
975         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
976         assertEquals("Behavior State", RaftState.Follower,
977             mockRaftActor.getCurrentBehavior().state());
978
979         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
980         defaultConfig.setCustomRaftPolicyImplementationClass(
981             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
982         mockRaftActor.updateConfigParams(defaultConfig);
983         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
984         assertEquals("Behavior State", RaftState.Follower,
985             mockRaftActor.getCurrentBehavior().state());
986
987         behavior = mockRaftActor.getCurrentBehavior();
988         mockRaftActor.updateConfigParams(defaultConfig);
989         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
990         assertEquals("Behavior State", RaftState.Follower,
991             mockRaftActor.getCurrentBehavior().state());
992     }
993
994     @Test
995     public void testGetSnapshot() {
996         TEST_LOG.info("testGetSnapshot starting");
997
998         final TestKit kit = new TestKit(getSystem());
999
1000         String persistenceId = factory.generateActorId("test-actor-");
1001         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1002         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1003
1004         long term = 3;
1005         long seqN = 1;
1006         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1007         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(0, term,
1008                 new MockRaftActorContext.MockPayload("A")));
1009         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(1, term,
1010                 new MockRaftActorContext.MockPayload("B")));
1011         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1012         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(2, term,
1013                 new MockRaftActorContext.MockPayload("C")));
1014
1015         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1016                 Map.of("member1", "address"), config)
1017                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1018         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1019
1020         mockRaftActor.waitForRecoveryComplete();
1021
1022         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1023
1024         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1025
1026         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1027         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
1028                 eq(Optional.empty()));
1029
1030         byte[] stateSnapshot = new byte[]{1,2,3};
1031         replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), Optional.empty()),
1032                 ActorRef.noSender());
1033
1034         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1035
1036         assertEquals("getId", persistenceId, reply.getId());
1037         Snapshot replySnapshot = reply.getSnapshot();
1038         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1039         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1040         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1041         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1042         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1043         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1044         assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState());
1045         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1046         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1047
1048         // Test with timeout
1049
1050         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1051             FiniteDuration.create(200, TimeUnit.MILLISECONDS));
1052         reset(mockRaftActor.snapshotCohortDelegate);
1053
1054         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1055         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1056         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1057
1058         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1059             FiniteDuration.create(30, TimeUnit.SECONDS));
1060
1061         // Test with persistence disabled.
1062
1063         mockRaftActor.setPersistence(false);
1064         reset(mockRaftActor.snapshotCohortDelegate);
1065
1066         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1067         reply = kit.expectMsgClass(GetSnapshotReply.class);
1068         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(), any());
1069
1070         assertEquals("getId", persistenceId, reply.getId());
1071         replySnapshot = reply.getSnapshot();
1072         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1073         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1074         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1075         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1076         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1077         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1078         assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState());
1079         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1080
1081         TEST_LOG.info("testGetSnapshot ending");
1082     }
1083
1084     @Test
1085     public void testRestoreFromSnapshot() {
1086         TEST_LOG.info("testRestoreFromSnapshot starting");
1087
1088         String persistenceId = factory.generateActorId("test-actor-");
1089         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1090         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1091
1092         List<ReplicatedLogEntry> snapshotUnappliedEntries = List.of(
1093             new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
1094
1095         int snapshotLastApplied = 3;
1096         int snapshotLastIndex = 4;
1097
1098         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
1099                 new MockRaftActorContext.MockPayload("A"),
1100                 new MockRaftActorContext.MockPayload("B"),
1101                 new MockRaftActorContext.MockPayload("C"),
1102                 new MockRaftActorContext.MockPayload("D")));
1103
1104         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries,
1105                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null);
1106
1107         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1108
1109         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1110                 .config(config).restoreFromSnapshot(snapshot).props()
1111                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1112         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1113
1114         mockRaftActor.waitForRecoveryComplete();
1115
1116         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1117         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1118         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1119         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1120         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1121         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1122         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1123         assertEquals("getState", snapshot.getState(), savedSnapshot.getState());
1124         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1125
1126         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class));
1127
1128         RaftActorContext context = mockRaftActor.getRaftActorContext();
1129         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1130         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1131         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1132         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1133         assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState());
1134         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1135         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1136
1137         // Test with data persistence disabled
1138
1139         snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(),
1140                 -1, -1, -1, -1, 5, "member-1", null);
1141
1142         persistenceId = factory.generateActorId("test-actor-");
1143
1144         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1145                 .config(config).restoreFromSnapshot(snapshot)
1146                 .persistent(Optional.of(Boolean.FALSE)).props()
1147                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1148         mockRaftActor = raftActorRef.underlyingActor();
1149
1150         mockRaftActor.waitForRecoveryComplete();
1151         assertEquals("snapshot committed", true,
1152                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1153
1154         context = mockRaftActor.getRaftActorContext();
1155         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1156         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1157
1158         TEST_LOG.info("testRestoreFromSnapshot ending");
1159     }
1160
1161     @Test
1162     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1163         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1164
1165         String persistenceId = factory.generateActorId("test-actor-");
1166         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1167         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1168
1169         List<MockPayload> state = List.of(new MockRaftActorContext.MockPayload("A"));
1170         Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
1171                 List.of(), 5, 2, 5, 2, 2, "member-1", null);
1172
1173         InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
1174                 new MockRaftActorContext.MockPayload("B")));
1175
1176         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1177                 .config(config).restoreFromSnapshot(snapshot).props()
1178                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1179         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1180
1181         mockRaftActor.waitForRecoveryComplete();
1182
1183         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1184         verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
1185
1186         RaftActorContext context = mockRaftActor.getRaftActorContext();
1187         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1188         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1189         assertEquals("Last applied", -1, context.getLastApplied());
1190         assertEquals("Commit index", -1, context.getCommitIndex());
1191         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1192         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1193
1194         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1195     }
1196
1197     @Test
1198     public void testNonVotingOnRecovery() {
1199         TEST_LOG.info("testNonVotingOnRecovery starting");
1200
1201         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1202         config.setElectionTimeoutFactor(1);
1203         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1204
1205         String persistenceId = factory.generateActorId("test-actor-");
1206         InMemoryJournal.addEntry(persistenceId, 1,  new SimpleReplicatedLogEntry(0, 1,
1207                 new ServerConfigurationPayload(List.of(new ServerInfo(persistenceId, false)))));
1208
1209         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1210                 .config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1211         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1212
1213         mockRaftActor.waitForInitializeBehaviorComplete();
1214
1215         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1216
1217         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1218         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1219
1220         TEST_LOG.info("testNonVotingOnRecovery ending");
1221     }
1222
1223     @Test
1224     public void testLeaderTransitioning() {
1225         TEST_LOG.info("testLeaderTransitioning starting");
1226
1227         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
1228
1229         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1230         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1231
1232         String persistenceId = factory.generateActorId("test-actor-");
1233
1234         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1235                 .config(config).roleChangeNotifier(notifierActor).props()
1236                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1237         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1238
1239         mockRaftActor.waitForInitializeBehaviorComplete();
1240
1241         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, List.of(),
1242                 0L, -1L, (short)1), ActorRef.noSender());
1243         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1244                 notifierActor, LeaderStateChanged.class);
1245         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1246
1247         MessageCollectorActor.clearMessages(notifierActor);
1248
1249         raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
1250
1251         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1252         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1253         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1254
1255         TEST_LOG.info("testLeaderTransitioning ending");
1256     }
1257
1258     @SuppressWarnings({ "unchecked", "rawtypes" })
1259     @Test
1260     public void testReplicateWithPersistencePending() throws Exception {
1261         final String leaderId = factory.generateActorId("leader-");
1262         final String followerId = factory.generateActorId("follower-");
1263
1264         final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
1265
1266         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1267         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1268         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1269
1270         DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
1271         doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
1272
1273         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1274                 MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config,
1275                         mockPersistenceProvider), leaderId);
1276         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1277         leaderActor.waitForInitializeBehaviorComplete();
1278
1279         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1280
1281         Leader leader = new Leader(leaderActor.getRaftActorContext());
1282         leaderActor.setCurrentBehavior(leader);
1283
1284         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
1285                 false);
1286
1287         ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
1288         assertNotNull("ReplicatedLogEntry not found", logEntry);
1289         assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
1290         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1291
1292         leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
1293         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1294
1295         ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
1296         verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
1297
1298         callbackCaptor.getValue().apply(logEntry);
1299
1300         assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
1301         assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
1302     }
1303
1304     @Test
1305     public void testReplicateWithBatchHint() throws Exception {
1306         final String leaderId = factory.generateActorId("leader-");
1307         final String followerId = factory.generateActorId("follower-");
1308
1309         final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
1310
1311         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1312         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1313         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1314
1315         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1316                 MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config),
1317                     leaderId);
1318         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1319         leaderActor.waitForInitializeBehaviorComplete();
1320
1321         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1322
1323         Leader leader = new Leader(leaderActor.getRaftActorContext());
1324         leaderActor.setCurrentBehavior(leader);
1325
1326         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1327         MessageCollectorActor.clearMessages(followerActor);
1328
1329         leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
1330
1331         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
1332         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1333
1334         leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
1335         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1336
1337         leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
1338         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1339         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
1340     }
1341
1342     @Test
1343     @SuppressWarnings("checkstyle:illegalcatch")
1344     public void testApplyStateRace() throws Exception {
1345         final String leaderId = factory.generateActorId("leader-");
1346         final String followerId = factory.generateActorId("follower-");
1347
1348         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1349         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1350         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1351
1352         ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props());
1353
1354         TestRaftActor.Builder builder = TestRaftActor.newBuilder()
1355                 .id(leaderId)
1356                 .peerAddresses(Map.of(followerId, mockFollowerActorRef.path().toString()))
1357                 .config(config)
1358                 .collectorActor(factory.createActor(
1359                         MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector")));
1360
1361         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1362                 builder.props(), leaderId);
1363         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1364         leaderActor.waitForInitializeBehaviorComplete();
1365
1366         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1367         Leader leader = new Leader(leaderActor.getRaftActorContext());
1368         leaderActor.setCurrentBehavior(leader);
1369
1370         final ExecutorService executorService = Executors.newSingleThreadExecutor();
1371
1372         leaderActor.setPersistence(new PersistentDataProvider(leaderActor) {
1373             @Override
1374             public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
1375                 // needs to be executed from another thread to simulate the persistence actor calling this callback
1376                 executorService.submit(() -> {
1377                     try {
1378                         procedure.apply(entry);
1379                     } catch (Exception e) {
1380                         TEST_LOG.info("Fail during async persist callback", e);
1381                     }
1382                 }, "persistence-callback");
1383             }
1384         });
1385
1386         leader.getFollower(followerId).setNextIndex(0);
1387         leader.getFollower(followerId).setMatchIndex(-1);
1388
1389         // hitting this is flimsy so run multiple times to improve the chance of things
1390         // blowing up while breaking actor containment
1391         final TestPersist message =
1392                 new TestPersist(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"));
1393         for (int i = 0; i < 100; i++) {
1394             leaderActorRef.tell(message, null);
1395
1396             AppendEntriesReply reply =
1397                     new AppendEntriesReply(followerId, 1, true, i, 1, (short) 5);
1398             leaderActorRef.tell(reply, mockFollowerActorRef);
1399         }
1400
1401         await("Persistence callback.").atMost(5, TimeUnit.SECONDS).until(() -> leaderActor.getState().size() == 100);
1402         executorService.shutdown();
1403     }
1404 }