Remove support for pre-Fluorine versions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNotSame;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.ArgumentMatchers.eq;
19 import static org.mockito.ArgumentMatchers.same;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.verify;
26
27 import akka.actor.ActorRef;
28 import akka.actor.PoisonPill;
29 import akka.actor.Status.Failure;
30 import akka.actor.Terminated;
31 import akka.dispatch.Dispatchers;
32 import akka.japi.Procedure;
33 import akka.persistence.SaveSnapshotFailure;
34 import akka.persistence.SaveSnapshotSuccess;
35 import akka.persistence.SnapshotMetadata;
36 import akka.persistence.SnapshotOffer;
37 import akka.protobuf.ByteString;
38 import akka.testkit.TestActorRef;
39 import akka.testkit.javadsl.TestKit;
40 import com.google.common.util.concurrent.Uninterruptibles;
41 import java.io.ByteArrayOutputStream;
42 import java.io.ObjectOutputStream;
43 import java.time.Duration;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Optional;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Before;
53 import org.junit.Test;
54 import org.mockito.ArgumentCaptor;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
57 import org.opendaylight.controller.cluster.PersistentDataProvider;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RoleChanged;
60 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestPersist;
61 import org.opendaylight.controller.cluster.raft.AbstractRaftActorIntegrationTest.TestRaftActor;
62 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
63 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
66 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
67 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
68 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
69 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
70 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
71 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
72 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
73 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
74 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
75 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
76 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
77 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
79 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
80 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
81 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
82 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
83 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
84 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
85 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
86 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
87 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
88 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
89 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
90 import org.opendaylight.yangtools.concepts.Identifier;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.duration.FiniteDuration;
94
95 public class RaftActorTest extends AbstractActorTest {
96
97     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
98
99     private TestActorFactory factory;
100
101     @Before
102     public void setUp() {
103         factory = new TestActorFactory(getSystem());
104     }
105
106     @After
107     public void tearDown() {
108         factory.close();
109         InMemoryJournal.clear();
110         InMemorySnapshotStore.clear();
111     }
112
113     @Test
114     public void testConstruction() {
115         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
116     }
117
118     @Test
119     public void testFindLeaderWhenLeaderIsSelf() {
120         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
121         kit.waitUntilLeader();
122     }
123
124
125     @Test
126     public void testRaftActorRecoveryWithPersistenceEnabled() {
127         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
128
129         TestKit kit = new TestKit(getSystem());
130         String persistenceId = factory.generateActorId("follower-");
131
132         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
133
134         // Set the heartbeat interval high to essentially disable election otherwise the test
135         // may fail if the actor is switched to Leader and the commitIndex is set to the last
136         // log entry.
137         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
138
139         Map<String, String> peerAddresses = Map.of("member1", "address");
140         ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
141                 peerAddresses, config), persistenceId);
142
143         kit.watch(followerActor);
144
145         List<ReplicatedLogEntry> snapshotUnappliedEntries = List.of(
146             new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
147
148         int lastAppliedDuringSnapshotCapture = 3;
149         int lastIndexDuringSnapshotCapture = 4;
150
151         // 4 messages as part of snapshot, which are applied to state
152         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
153                 new MockRaftActorContext.MockPayload("A"),
154                 new MockRaftActorContext.MockPayload("B"),
155                 new MockRaftActorContext.MockPayload("C"),
156                 new MockRaftActorContext.MockPayload("D")));
157
158         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
159                 lastAppliedDuringSnapshotCapture, 1, -1, null, null);
160         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
161
162         // add more entries after snapshot is taken
163         ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F", 2));
164         ReplicatedLogEntry entry3 = new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("G", 3));
165         ReplicatedLogEntry entry4 = new SimpleReplicatedLogEntry(7, 1, new MockRaftActorContext.MockPayload("H", 4));
166
167         final int lastAppliedToState = 5;
168         final int lastIndex = 7;
169
170         InMemoryJournal.addEntry(persistenceId, 5, entry2);
171         // 2 entries are applied to state besides the 4 entries in snapshot
172         InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
173         InMemoryJournal.addEntry(persistenceId, 7, entry3);
174         InMemoryJournal.addEntry(persistenceId, 8, entry4);
175
176         // kill the actor
177         followerActor.tell(PoisonPill.getInstance(), null);
178         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
179
180         kit.unwatch(followerActor);
181
182         //reinstate the actor
183         TestActorRef<MockRaftActor> ref = factory.createTestActor(
184                 MockRaftActor.props(persistenceId, peerAddresses, config));
185
186         MockRaftActor mockRaftActor = ref.underlyingActor();
187
188         mockRaftActor.waitForRecoveryComplete();
189
190         RaftActorContext context = mockRaftActor.getRaftActorContext();
191         assertEquals("Journal log size", snapshotUnappliedEntries.size() + 3,
192                 context.getReplicatedLog().size());
193         assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
194         assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
195         assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
196         assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
197         assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
198
199         mockRaftActor.waitForInitializeBehaviorComplete();
200
201         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
202
203         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
204     }
205
206     @Test
207     public void testRaftActorRecoveryWithPersistenceDisabled() {
208         String persistenceId = factory.generateActorId("follower-");
209
210         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
211
212         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
213
214         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
215                 Map.of("member1", "address"), config, createProvider()), persistenceId);
216
217         MockRaftActor mockRaftActor = ref.underlyingActor();
218
219         mockRaftActor.waitForRecoveryComplete();
220
221         mockRaftActor.waitForInitializeBehaviorComplete();
222
223         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
224     }
225
226     @Test
227     public void testUpdateElectionTermPersistedWithPersistenceDisabled() {
228         final TestKit kit = new TestKit(getSystem());
229         String persistenceId = factory.generateActorId("follower-");
230         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
231         config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
232         config.setElectionTimeoutFactor(1);
233
234         InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
235
236         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
237                 Map.of("member1", "address"), config, createProvider())
238                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
239
240         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
241         List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
242         assertEquals("UpdateElectionTerm entries", 1, entries.size());
243         final UpdateElectionTerm updateEntry = entries.get(0);
244
245         factory.killActor(ref, kit);
246
247         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
248         ref = factory.createTestActor(MockRaftActor.props(persistenceId, Map.of("member1", "address"), config,
249                 createProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
250                 factory.generateActorId("follower-"));
251
252         MockRaftActor actor = ref.underlyingActor();
253         actor.waitForRecoveryComplete();
254
255         RaftActorContext newContext = actor.getRaftActorContext();
256         assertEquals("electionTerm", updateEntry.getCurrentTerm(),
257                 newContext.getTermInformation().getCurrentTerm());
258         assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
259
260         entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
261         assertEquals("UpdateElectionTerm entries", 1, entries.size());
262     }
263
264     @Test
265     public void testRaftActorForwardsToRaftActorRecoverySupport() {
266         String persistenceId = factory.generateActorId("leader-");
267
268         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
269
270         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
271
272         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
273                 Map.of(), config), persistenceId);
274
275         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
276
277         // Wait for akka's recovery to complete so it doesn't interfere.
278         mockRaftActor.waitForRecoveryComplete();
279
280         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
281         mockRaftActor.setRaftActorRecoverySupport(mockSupport);
282
283         Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}),
284                 List.of(), 3, 1, 3, 1, -1, null, null);
285         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
286         mockRaftActor.handleRecover(snapshotOffer);
287
288         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
289         mockRaftActor.handleRecover(logEntry);
290
291         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
292         mockRaftActor.handleRecover(applyJournalEntries);
293
294         DeleteEntries deleteEntries = new DeleteEntries(1);
295         mockRaftActor.handleRecover(deleteEntries);
296
297         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
298         mockRaftActor.handleRecover(updateElectionTerm);
299
300         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
301         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
302         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
303         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
304         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
305     }
306
307     @Test
308     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
309         String persistenceId = factory.generateActorId("leader-");
310
311         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
312
313         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
314
315         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
316
317         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
318                 .config(config).snapshotMessageSupport(mockSupport).props());
319
320         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
321
322         // Wait for akka's recovery to complete so it doesn't interfere.
323         mockRaftActor.waitForRecoveryComplete();
324
325         ApplySnapshot applySnapshot = new ApplySnapshot(
326             Snapshot.create(null, null, 0, 0, 0, 0, 0, persistenceId, null));
327         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
328         mockRaftActor.handleCommand(applySnapshot);
329
330         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(), Optional.empty());
331         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
332         mockRaftActor.handleCommand(captureSnapshotReply);
333
334         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
335         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
336         mockRaftActor.handleCommand(saveSnapshotSuccess);
337
338         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L),
339                 new Throwable());
340         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
341         mockRaftActor.handleCommand(saveSnapshotFailure);
342
343         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
344                 any(ActorRef.class));
345         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
346
347         doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
348         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
349
350         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
351         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
352         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
353         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
354         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
355                 any(ActorRef.class));
356         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
357     }
358
359     @SuppressWarnings("unchecked")
360     @Test
361     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
362         String persistenceId = factory.generateActorId("leader-");
363
364         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
365
366         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
367
368         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
369
370         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
371                 Map.of(), config, dataPersistenceProvider), persistenceId);
372
373         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
374
375         mockRaftActor.waitForInitializeBehaviorComplete();
376
377         mockRaftActor.waitUntilLeader();
378
379         mockRaftActor.handleCommand(new ApplyJournalEntries(10));
380
381         verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
382     }
383
384     @Test
385     public void testApplyState() {
386         String persistenceId = factory.generateActorId("leader-");
387
388         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
389
390         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
391
392         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
393
394         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
395                 Map.of(), config, dataPersistenceProvider), persistenceId);
396
397         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
398
399         mockRaftActor.waitForInitializeBehaviorComplete();
400
401         ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
402
403         final Identifier id = new MockIdentifier("apply-state");
404         mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
405
406         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), any());
407     }
408
409     @Test
410     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
411         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
412         MessageCollectorActor.waitUntilReady(notifierActor);
413
414         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
415         long heartBeatInterval = 100;
416         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
417         config.setElectionTimeoutFactor(20);
418
419         String persistenceId = factory.generateActorId("notifier-");
420
421         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
422                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
423                     createProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
424                 persistenceId);
425
426         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
427
428
429         // check if the notifier got a role change from null to Follower
430         RoleChanged raftRoleChanged = matches.get(0);
431         assertEquals(persistenceId, raftRoleChanged.getMemberId());
432         assertNull(raftRoleChanged.getOldRole());
433         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
434
435         // check if the notifier got a role change from Follower to Candidate
436         raftRoleChanged = matches.get(1);
437         assertEquals(persistenceId, raftRoleChanged.getMemberId());
438         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
439         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
440
441         // check if the notifier got a role change from Candidate to Leader
442         raftRoleChanged = matches.get(2);
443         assertEquals(persistenceId, raftRoleChanged.getMemberId());
444         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
445         assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
446
447         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
448                 notifierActor, LeaderStateChanged.class);
449
450         assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
451         assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
452
453         MessageCollectorActor.clearMessages(notifierActor);
454
455         MockRaftActor raftActor = raftActorRef.underlyingActor();
456         final String newLeaderId = "new-leader";
457         final short newLeaderVersion = 6;
458         Follower follower = new Follower(raftActor.getRaftActorContext()) {
459             @Override
460             public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
461                 setLeaderId(newLeaderId);
462                 setLeaderPayloadVersion(newLeaderVersion);
463                 return this;
464             }
465         };
466
467         raftActor.newBehavior(follower);
468
469         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
470         assertEquals(persistenceId, leaderStateChange.getMemberId());
471         assertEquals(null, leaderStateChange.getLeaderId());
472
473         raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
474         assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
475         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
476
477         MessageCollectorActor.clearMessages(notifierActor);
478
479         raftActor.handleCommand("any");
480
481         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
482         assertEquals(persistenceId, leaderStateChange.getMemberId());
483         assertEquals(newLeaderId, leaderStateChange.getLeaderId());
484         assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
485
486         MessageCollectorActor.clearMessages(notifierActor);
487
488         raftActor.handleCommand("any");
489
490         Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
491         leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
492         assertNull(leaderStateChange);
493     }
494
495     @Test
496     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
497         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
498         MessageCollectorActor.waitUntilReady(notifierActor);
499
500         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
501         long heartBeatInterval = 100;
502         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
503         config.setElectionTimeoutFactor(1);
504
505         String persistenceId = factory.generateActorId("notifier-");
506
507         factory.createActor(MockRaftActor.builder().id(persistenceId)
508                 .peerAddresses(Map.of("leader", "fake/path"))
509                 .config(config).roleChangeNotifier(notifierActor).props());
510
511         List<RoleChanged> matches =  null;
512         for (int i = 0; i < 5000 / heartBeatInterval; i++) {
513             matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
514             assertNotNull(matches);
515             if (matches.size() == 3) {
516                 break;
517             }
518             Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
519         }
520
521         assertNotNull(matches);
522         assertEquals(2, matches.size());
523
524         // check if the notifier got a role change from null to Follower
525         RoleChanged raftRoleChanged = matches.get(0);
526         assertEquals(persistenceId, raftRoleChanged.getMemberId());
527         assertNull(raftRoleChanged.getOldRole());
528         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
529
530         // check if the notifier got a role change from Follower to Candidate
531         raftRoleChanged = matches.get(1);
532         assertEquals(persistenceId, raftRoleChanged.getMemberId());
533         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
534         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
535     }
536
537     @Test
538     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
539         final String persistenceId = factory.generateActorId("leader-");
540         final String follower1Id = factory.generateActorId("follower-");
541
542         ActorRef followerActor1 = factory.createActor(MessageCollectorActor.props());
543
544         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
545         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
546         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
547
548         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
549
550         Map<String, String> peerAddresses = Map.of(follower1Id, followerActor1.path().toString());
551
552         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
553                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
554
555         MockRaftActor leaderActor = mockActorRef.underlyingActor();
556
557         leaderActor.getRaftActorContext().setCommitIndex(4);
558         leaderActor.getRaftActorContext().setLastApplied(4);
559         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
560
561         leaderActor.waitForInitializeBehaviorComplete();
562
563         // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
564
565         Leader leader = new Leader(leaderActor.getRaftActorContext());
566         leaderActor.setCurrentBehavior(leader);
567         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
568
569         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
570         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
571
572         assertEquals(8, leaderActor.getReplicatedLog().size());
573
574         leaderActor.getRaftActorContext().getSnapshotManager().capture(
575                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4);
576
577         verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(), any());
578
579         assertEquals(8, leaderActor.getReplicatedLog().size());
580
581         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
582         //fake snapshot on index 5
583         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
584
585         assertEquals(8, leaderActor.getReplicatedLog().size());
586
587         //fake snapshot on index 6
588         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
589         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
590         assertEquals(8, leaderActor.getReplicatedLog().size());
591
592         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
593
594         assertEquals(8, leaderActor.getReplicatedLog().size());
595
596         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
597                 new MockRaftActorContext.MockPayload("foo-0"),
598                 new MockRaftActorContext.MockPayload("foo-1"),
599                 new MockRaftActorContext.MockPayload("foo-2"),
600                 new MockRaftActorContext.MockPayload("foo-3"),
601                 new MockRaftActorContext.MockPayload("foo-4")));
602
603         leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, Optional.empty(),
604                 Runtime.getRuntime().totalMemory());
605
606         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
607
608         // The commit is needed to complete the snapshot creation process
609         leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
610
611         // capture snapshot reply should remove the snapshotted entries only
612         assertEquals(3, leaderActor.getReplicatedLog().size());
613         assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
614
615         // add another non-replicated entry
616         leaderActor.getReplicatedLog().append(
617                 new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
618
619         //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
620         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
621         assertEquals(2, leaderActor.getReplicatedLog().size());
622         assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
623     }
624
625     @Test
626     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
627         final String persistenceId = factory.generateActorId("follower-");
628         final String leaderId = factory.generateActorId("leader-");
629
630         ActorRef leaderActor1 = factory.createActor(MessageCollectorActor.props());
631
632         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
633         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
634         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
635
636         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
637
638         Map<String, String> peerAddresses = Map.of(leaderId, leaderActor1.path().toString());
639
640         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
641                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
642
643         MockRaftActor followerActor = mockActorRef.underlyingActor();
644         followerActor.getRaftActorContext().setCommitIndex(4);
645         followerActor.getRaftActorContext().setLastApplied(4);
646         followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
647
648         followerActor.waitForInitializeBehaviorComplete();
649
650
651         Follower follower = new Follower(followerActor.getRaftActorContext());
652         followerActor.setCurrentBehavior(follower);
653         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
654
655         // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
656         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
657         followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
658
659         // log has indices 0-5
660         assertEquals(6, followerActor.getReplicatedLog().size());
661
662         //snapshot on 4
663         followerActor.getRaftActorContext().getSnapshotManager().capture(
664                 new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4);
665
666         verify(followerActor.snapshotCohortDelegate).createSnapshot(any(), any());
667
668         assertEquals(6, followerActor.getReplicatedLog().size());
669
670         //fake snapshot on index 6
671         List<ReplicatedLogEntry> entries = List.of(
672                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6")));
673         followerActor.handleCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
674         assertEquals(7, followerActor.getReplicatedLog().size());
675
676         //fake snapshot on index 7
677         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
678
679         entries = List.of(new SimpleReplicatedLogEntry(7, 1,
680                 new MockRaftActorContext.MockPayload("foo-7")));
681         followerActor.handleCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
682         assertEquals(8, followerActor.getReplicatedLog().size());
683
684         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
685
686
687         ByteString snapshotBytes = fromObject(List.of(
688                 new MockRaftActorContext.MockPayload("foo-0"),
689                 new MockRaftActorContext.MockPayload("foo-1"),
690                 new MockRaftActorContext.MockPayload("foo-2"),
691                 new MockRaftActorContext.MockPayload("foo-3"),
692                 new MockRaftActorContext.MockPayload("foo-4")));
693         followerActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
694                 Optional.empty()));
695         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
696
697         // The commit is needed to complete the snapshot creation process
698         followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
699
700         // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
701         assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
702         assertEquals(7, followerActor.getReplicatedLog().lastIndex());
703
704         entries = List.of(new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-7")));
705         // send an additional entry 8 with leaderCommit = 7
706         followerActor.handleCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
707
708         // 7 and 8, as lastapplied is 7
709         assertEquals(2, followerActor.getReplicatedLog().size());
710     }
711
712     @Test
713     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
714         final String persistenceId = factory.generateActorId("leader-");
715         final String follower1Id = factory.generateActorId("follower-");
716         final String follower2Id = factory.generateActorId("follower-");
717
718         final ActorRef followerActor1 = factory.createActor(MessageCollectorActor.props(), follower1Id);
719         final ActorRef followerActor2 = factory.createActor(MessageCollectorActor.props(), follower2Id);
720
721         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
722         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
723         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
724
725         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
726
727         Map<String, String> peerAddresses = Map.of(
728             follower1Id, followerActor1.path().toString(),
729             follower2Id, followerActor2.path().toString());
730
731         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
732                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
733
734         MockRaftActor leaderActor = mockActorRef.underlyingActor();
735         leaderActor.getRaftActorContext().setCommitIndex(9);
736         leaderActor.getRaftActorContext().setLastApplied(9);
737         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
738
739         leaderActor.waitForInitializeBehaviorComplete();
740
741         Leader leader = new Leader(leaderActor.getRaftActorContext());
742         leaderActor.setCurrentBehavior(leader);
743         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
744
745         // create 5 entries in the log
746         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
747         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
748
749         //set the snapshot index to 4 , 0 to 4 are snapshotted
750         leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
751         //setting replicatedToAllIndex = 9, for the log to clear
752         leader.setReplicatedToAllIndex(9);
753         assertEquals(5, leaderActor.getReplicatedLog().size());
754         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
755
756         leaderActor.handleCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
757         assertEquals(5, leaderActor.getReplicatedLog().size());
758         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
759
760         // set the 2nd follower nextIndex to 1 which has been snapshotted
761         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
762         assertEquals(5, leaderActor.getReplicatedLog().size());
763         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
764
765         // simulate a real snapshot
766         leaderActor.handleCommand(SendHeartBeat.INSTANCE);
767         assertEquals(5, leaderActor.getReplicatedLog().size());
768         assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
769                 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
770                 RaftState.Leader, leaderActor.getCurrentBehavior().state());
771
772
773         //reply from a slow follower does not initiate a fake snapshot
774         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
775         assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
776                 leaderActor.getReplicatedLog().size());
777
778         ByteString snapshotBytes = fromObject(List.of(
779                 new MockRaftActorContext.MockPayload("foo-0"),
780                 new MockRaftActorContext.MockPayload("foo-1"),
781                 new MockRaftActorContext.MockPayload("foo-2"),
782                 new MockRaftActorContext.MockPayload("foo-3"),
783                 new MockRaftActorContext.MockPayload("foo-4")));
784         leaderActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
785                 Optional.empty()));
786         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
787
788         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
789                 leaderActor.getReplicatedLog().size());
790
791         //reply from a slow follower after should not raise errors
792         leaderActor.handleCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
793         assertEquals(0, leaderActor.getReplicatedLog().size());
794     }
795
796     @Test
797     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
798         String persistenceId = factory.generateActorId("leader-");
799         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
800         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
801         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
802         config.setSnapshotBatchCount(5);
803
804         DataPersistenceProvider dataPersistenceProvider = createProvider();
805
806         Map<String, String> peerAddresses = Map.of("member1", "address");
807
808         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
809                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
810
811         MockRaftActor leaderActor = mockActorRef.underlyingActor();
812         leaderActor.getRaftActorContext().setCommitIndex(3);
813         leaderActor.getRaftActorContext().setLastApplied(3);
814         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
815
816         leaderActor.waitForInitializeBehaviorComplete();
817         for (int i = 0; i < 4; i++) {
818             leaderActor.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
819                     new MockRaftActorContext.MockPayload("A")));
820         }
821
822         Leader leader = new Leader(leaderActor.getRaftActorContext());
823         leaderActor.setCurrentBehavior(leader);
824         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
825
826         // Simulate an install snaphost to a follower.
827         leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
828                 leaderActor.getReplicatedLog().last(), -1, "member1");
829
830         // Now send a CaptureSnapshotReply
831         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
832                 Optional.empty()), mockActorRef);
833
834         // Trimming log in this scenario is a no-op
835         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
836         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
837         assertEquals(-1, leader.getReplicatedToAllIndex());
838     }
839
840     @Test
841     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
842         String persistenceId = factory.generateActorId("leader-");
843         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
844         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
845         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
846         config.setSnapshotBatchCount(5);
847
848         DataPersistenceProvider dataPersistenceProvider = createProvider();
849
850         Map<String, String> peerAddresses = Map.of("member1", "address");
851
852         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
853                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
854
855         MockRaftActor leaderActor = mockActorRef.underlyingActor();
856         leaderActor.getRaftActorContext().setCommitIndex(3);
857         leaderActor.getRaftActorContext().setLastApplied(3);
858         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
859         leaderActor.getReplicatedLog().setSnapshotIndex(3);
860
861         leaderActor.waitForInitializeBehaviorComplete();
862         Leader leader = new Leader(leaderActor.getRaftActorContext());
863         leaderActor.setCurrentBehavior(leader);
864         leader.setReplicatedToAllIndex(3);
865         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
866
867         // Persist another entry (this will cause a CaptureSnapshot to be triggered
868         leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
869                 new MockRaftActorContext.MockPayload("duh"), false);
870
871         // Now send a CaptureSnapshotReply
872         mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
873                 Optional.empty()), mockActorRef);
874
875         // Trimming log in this scenario is a no-op
876         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
877         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
878         assertEquals(3, leader.getReplicatedToAllIndex());
879     }
880
881     private static DataPersistenceProvider createProvider() {
882         return new NonPersistentDataProvider(Runnable::run);
883     }
884
885     @Test
886     public void testSwitchBehavior() {
887         String persistenceId = factory.generateActorId("leader-");
888         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
889         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
890         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
891         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
892         config.setSnapshotBatchCount(5);
893
894         DataPersistenceProvider dataPersistenceProvider = createProvider();
895
896         Map<String, String> peerAddresses = Map.of();
897
898         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
899                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
900
901         MockRaftActor leaderActor = mockActorRef.underlyingActor();
902
903         leaderActor.waitForRecoveryComplete();
904
905         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
906
907         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
908         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
909
910         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
911
912         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
913         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
914
915         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
916
917         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
918         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
919
920         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
921
922         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
923         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
924     }
925
926     public static ByteString fromObject(final Object snapshot) throws Exception {
927         ByteArrayOutputStream bos = null;
928         ObjectOutputStream os = null;
929         try {
930             bos = new ByteArrayOutputStream();
931             os = new ObjectOutputStream(bos);
932             os.writeObject(snapshot);
933             byte[] snapshotBytes = bos.toByteArray();
934             return ByteString.copyFrom(snapshotBytes);
935         } finally {
936             if (os != null) {
937                 os.flush();
938                 os.close();
939             }
940             if (bos != null) {
941                 bos.close();
942             }
943         }
944     }
945
946     @Test
947     public void testUpdateConfigParam() {
948         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
949         String persistenceId = factory.generateActorId("follower-");
950         Map<String, String> peerAddresses = Map.of("member1", "address");
951         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
952
953         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
954                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
955         MockRaftActor mockRaftActor = actorRef.underlyingActor();
956         mockRaftActor.waitForInitializeBehaviorComplete();
957
958         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
959         mockRaftActor.updateConfigParams(emptyConfig);
960         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
961         assertEquals("Behavior State", RaftState.Follower,
962             mockRaftActor.getCurrentBehavior().state());
963
964         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
965         disableConfig.setCustomRaftPolicyImplementationClass(
966             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
967         mockRaftActor.updateConfigParams(disableConfig);
968         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
969         assertEquals("Behavior State", RaftState.Follower,
970             mockRaftActor.getCurrentBehavior().state());
971
972         behavior = mockRaftActor.getCurrentBehavior();
973         mockRaftActor.updateConfigParams(disableConfig);
974         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
975         assertEquals("Behavior State", RaftState.Follower,
976             mockRaftActor.getCurrentBehavior().state());
977
978         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
979         defaultConfig.setCustomRaftPolicyImplementationClass(
980             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
981         mockRaftActor.updateConfigParams(defaultConfig);
982         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
983         assertEquals("Behavior State", RaftState.Follower,
984             mockRaftActor.getCurrentBehavior().state());
985
986         behavior = mockRaftActor.getCurrentBehavior();
987         mockRaftActor.updateConfigParams(defaultConfig);
988         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
989         assertEquals("Behavior State", RaftState.Follower,
990             mockRaftActor.getCurrentBehavior().state());
991     }
992
993     @Test
994     public void testGetSnapshot() {
995         TEST_LOG.info("testGetSnapshot starting");
996
997         final TestKit kit = new TestKit(getSystem());
998
999         String persistenceId = factory.generateActorId("test-actor-");
1000         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1001         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1002
1003         long term = 3;
1004         long seqN = 1;
1005         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1006         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(0, term,
1007                 new MockRaftActorContext.MockPayload("A")));
1008         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(1, term,
1009                 new MockRaftActorContext.MockPayload("B")));
1010         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1011         InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(2, term,
1012                 new MockRaftActorContext.MockPayload("C")));
1013
1014         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1015                 Map.of("member1", "address"), config)
1016                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1017         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1018
1019         mockRaftActor.waitForRecoveryComplete();
1020
1021         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1022
1023         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1024
1025         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1026         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
1027                 eq(Optional.empty()));
1028
1029         byte[] stateSnapshot = new byte[]{1,2,3};
1030         replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), Optional.empty()),
1031                 ActorRef.noSender());
1032
1033         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1034
1035         assertEquals("getId", persistenceId, reply.getId());
1036         Snapshot replySnapshot = reply.getSnapshot();
1037         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1038         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1039         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1040         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1041         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1042         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1043         assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState());
1044         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1045         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1046
1047         // Test with timeout
1048
1049         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1050             FiniteDuration.create(200, TimeUnit.MILLISECONDS));
1051         reset(mockRaftActor.snapshotCohortDelegate);
1052
1053         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1054         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1055         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1056
1057         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1058             FiniteDuration.create(30, TimeUnit.SECONDS));
1059
1060         // Test with persistence disabled.
1061
1062         mockRaftActor.setPersistence(false);
1063         reset(mockRaftActor.snapshotCohortDelegate);
1064
1065         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1066         reply = kit.expectMsgClass(GetSnapshotReply.class);
1067         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(), any());
1068
1069         assertEquals("getId", persistenceId, reply.getId());
1070         replySnapshot = reply.getSnapshot();
1071         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1072         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1073         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1074         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1075         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1076         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1077         assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState());
1078         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1079
1080         TEST_LOG.info("testGetSnapshot ending");
1081     }
1082
1083     @Test
1084     public void testRestoreFromSnapshot() {
1085         TEST_LOG.info("testRestoreFromSnapshot starting");
1086
1087         String persistenceId = factory.generateActorId("test-actor-");
1088         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1089         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1090
1091         List<ReplicatedLogEntry> snapshotUnappliedEntries = List.of(
1092             new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
1093
1094         int snapshotLastApplied = 3;
1095         int snapshotLastIndex = 4;
1096
1097         MockSnapshotState snapshotState = new MockSnapshotState(List.of(
1098                 new MockRaftActorContext.MockPayload("A"),
1099                 new MockRaftActorContext.MockPayload("B"),
1100                 new MockRaftActorContext.MockPayload("C"),
1101                 new MockRaftActorContext.MockPayload("D")));
1102
1103         Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries,
1104                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null);
1105
1106         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1107
1108         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1109                 .config(config).restoreFromSnapshot(snapshot).props()
1110                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1111         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1112
1113         mockRaftActor.waitForRecoveryComplete();
1114
1115         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1116         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1117         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1118         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1119         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1120         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1121         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1122         assertEquals("getState", snapshot.getState(), savedSnapshot.getState());
1123         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1124
1125         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class));
1126
1127         RaftActorContext context = mockRaftActor.getRaftActorContext();
1128         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1129         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1130         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1131         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1132         assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState());
1133         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1134         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1135
1136         // Test with data persistence disabled
1137
1138         snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(),
1139                 -1, -1, -1, -1, 5, "member-1", null);
1140
1141         persistenceId = factory.generateActorId("test-actor-");
1142
1143         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1144                 .config(config).restoreFromSnapshot(snapshot)
1145                 .persistent(Optional.of(Boolean.FALSE)).props()
1146                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1147         mockRaftActor = raftActorRef.underlyingActor();
1148
1149         mockRaftActor.waitForRecoveryComplete();
1150         assertEquals("snapshot committed", true,
1151                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1152
1153         context = mockRaftActor.getRaftActorContext();
1154         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1155         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1156
1157         TEST_LOG.info("testRestoreFromSnapshot ending");
1158     }
1159
1160     @Test
1161     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1162         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1163
1164         String persistenceId = factory.generateActorId("test-actor-");
1165         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1166         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1167
1168         List<MockPayload> state = List.of(new MockRaftActorContext.MockPayload("A"));
1169         Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
1170                 List.of(), 5, 2, 5, 2, 2, "member-1", null);
1171
1172         InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
1173                 new MockRaftActorContext.MockPayload("B")));
1174
1175         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1176                 .config(config).restoreFromSnapshot(snapshot).props()
1177                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1178         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1179
1180         mockRaftActor.waitForRecoveryComplete();
1181
1182         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1183         verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
1184
1185         RaftActorContext context = mockRaftActor.getRaftActorContext();
1186         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1187         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1188         assertEquals("Last applied", -1, context.getLastApplied());
1189         assertEquals("Commit index", -1, context.getCommitIndex());
1190         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1191         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1192
1193         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1194     }
1195
1196     @Test
1197     public void testNonVotingOnRecovery() {
1198         TEST_LOG.info("testNonVotingOnRecovery starting");
1199
1200         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1201         config.setElectionTimeoutFactor(1);
1202         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1203
1204         String persistenceId = factory.generateActorId("test-actor-");
1205         InMemoryJournal.addEntry(persistenceId, 1,  new SimpleReplicatedLogEntry(0, 1,
1206                 new ServerConfigurationPayload(List.of(new ServerInfo(persistenceId, false)))));
1207
1208         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1209                 .config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1210         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1211
1212         mockRaftActor.waitForInitializeBehaviorComplete();
1213
1214         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1215
1216         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1217         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1218
1219         TEST_LOG.info("testNonVotingOnRecovery ending");
1220     }
1221
1222     @Test
1223     public void testLeaderTransitioning() {
1224         TEST_LOG.info("testLeaderTransitioning starting");
1225
1226         ActorRef notifierActor = factory.createActor(MessageCollectorActor.props());
1227
1228         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1229         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1230
1231         String persistenceId = factory.generateActorId("test-actor-");
1232
1233         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1234                 .config(config).roleChangeNotifier(notifierActor).props()
1235                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1236         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1237
1238         mockRaftActor.waitForInitializeBehaviorComplete();
1239
1240         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, List.of(),
1241                 0L, -1L, (short)1), ActorRef.noSender());
1242         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1243                 notifierActor, LeaderStateChanged.class);
1244         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1245
1246         MessageCollectorActor.clearMessages(notifierActor);
1247
1248         raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
1249
1250         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1251         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1252         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1253
1254         TEST_LOG.info("testLeaderTransitioning ending");
1255     }
1256
1257     @SuppressWarnings({ "unchecked", "rawtypes" })
1258     @Test
1259     public void testReplicateWithPersistencePending() throws Exception {
1260         final String leaderId = factory.generateActorId("leader-");
1261         final String followerId = factory.generateActorId("follower-");
1262
1263         final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
1264
1265         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1266         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1267         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1268
1269         DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
1270         doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
1271
1272         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1273                 MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config,
1274                         mockPersistenceProvider), leaderId);
1275         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1276         leaderActor.waitForInitializeBehaviorComplete();
1277
1278         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1279
1280         Leader leader = new Leader(leaderActor.getRaftActorContext());
1281         leaderActor.setCurrentBehavior(leader);
1282
1283         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
1284                 false);
1285
1286         ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
1287         assertNotNull("ReplicatedLogEntry not found", logEntry);
1288         assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
1289         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1290
1291         leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
1292         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1293
1294         ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
1295         verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
1296
1297         callbackCaptor.getValue().apply(logEntry);
1298
1299         assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
1300         assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
1301     }
1302
1303     @Test
1304     public void testReplicateWithBatchHint() throws Exception {
1305         final String leaderId = factory.generateActorId("leader-");
1306         final String followerId = factory.generateActorId("follower-");
1307
1308         final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
1309
1310         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1311         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1312         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1313
1314         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1315                 MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config),
1316                     leaderId);
1317         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1318         leaderActor.waitForInitializeBehaviorComplete();
1319
1320         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1321
1322         Leader leader = new Leader(leaderActor.getRaftActorContext());
1323         leaderActor.setCurrentBehavior(leader);
1324
1325         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1326         MessageCollectorActor.clearMessages(followerActor);
1327
1328         leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
1329
1330         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
1331         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1332
1333         leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
1334         MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
1335
1336         leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
1337         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1338         assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
1339     }
1340
1341     @Test
1342     @SuppressWarnings("checkstyle:illegalcatch")
1343     public void testApplyStateRace() throws Exception {
1344         final String leaderId = factory.generateActorId("leader-");
1345         final String followerId = factory.generateActorId("follower-");
1346
1347         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1348         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1349         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1350
1351         ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props());
1352
1353         TestRaftActor.Builder builder = TestRaftActor.newBuilder()
1354                 .id(leaderId)
1355                 .peerAddresses(Map.of(followerId, mockFollowerActorRef.path().toString()))
1356                 .config(config)
1357                 .collectorActor(factory.createActor(
1358                         MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector")));
1359
1360         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1361                 builder.props(), leaderId);
1362         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1363         leaderActor.waitForInitializeBehaviorComplete();
1364
1365         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1366         Leader leader = new Leader(leaderActor.getRaftActorContext());
1367         leaderActor.setCurrentBehavior(leader);
1368
1369         final ExecutorService executorService = Executors.newSingleThreadExecutor();
1370
1371         leaderActor.setPersistence(new PersistentDataProvider(leaderActor) {
1372             @Override
1373             public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
1374                 // needs to be executed from another thread to simulate the persistence actor calling this callback
1375                 executorService.submit(() -> {
1376                     try {
1377                         procedure.apply(entry);
1378                     } catch (Exception e) {
1379                         TEST_LOG.info("Fail during async persist callback", e);
1380                     }
1381                 }, "persistence-callback");
1382             }
1383         });
1384
1385         leader.getFollower(followerId).setNextIndex(0);
1386         leader.getFollower(followerId).setMatchIndex(-1);
1387
1388         // hitting this is flimsy so run multiple times to improve the chance of things
1389         // blowing up while breaking actor containment
1390         final TestPersist message =
1391                 new TestPersist(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"));
1392         for (int i = 0; i < 100; i++) {
1393             leaderActorRef.tell(message, null);
1394
1395             AppendEntriesReply reply =
1396                     new AppendEntriesReply(followerId, 1, true, i, 1, (short) 5);
1397             leaderActorRef.tell(reply, mockFollowerActorRef);
1398         }
1399
1400         await("Persistence callback.").atMost(5, TimeUnit.SECONDS).until(() -> leaderActor.getState().size() == 100);
1401         executorService.shutdown();
1402     }
1403 }