d57c354e5a0fbf7f4dcdcca4bb3c246763d6db4a
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNotSame;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Matchers.anyObject;
20 import static org.mockito.Matchers.eq;
21 import static org.mockito.Matchers.same;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.never;
25 import static org.mockito.Mockito.reset;
26 import static org.mockito.Mockito.timeout;
27 import static org.mockito.Mockito.verify;
28
29 import akka.actor.ActorRef;
30 import akka.actor.PoisonPill;
31 import akka.actor.Props;
32 import akka.actor.Status.Failure;
33 import akka.actor.Terminated;
34 import akka.dispatch.Dispatchers;
35 import akka.japi.Procedure;
36 import akka.persistence.SaveSnapshotFailure;
37 import akka.persistence.SaveSnapshotSuccess;
38 import akka.persistence.SnapshotMetadata;
39 import akka.persistence.SnapshotOffer;
40 import akka.testkit.JavaTestKit;
41 import akka.testkit.TestActorRef;
42 import com.google.common.base.Optional;
43 import com.google.common.collect.ImmutableMap;
44 import com.google.common.util.concurrent.Uninterruptibles;
45 import com.google.protobuf.ByteString;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectOutputStream;
48 import java.util.ArrayList;
49 import java.util.Arrays;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56 import org.apache.commons.lang3.SerializationUtils;
57 import org.junit.After;
58 import org.junit.Before;
59 import org.junit.Test;
60 import org.mockito.ArgumentCaptor;
61 import org.opendaylight.controller.cluster.DataPersistenceProvider;
62 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
63 import org.opendaylight.controller.cluster.PersistentDataProvider;
64 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
65 import org.opendaylight.controller.cluster.notifications.RoleChanged;
66 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
67 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
68 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
69 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
70 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
71 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
72 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
73 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
74 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
75 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
76 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
77 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
78 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
79 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
80 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
81 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
83 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
84 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
85 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
87 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
88 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
89 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.opendaylight.yangtools.concepts.Identifier;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
94 import scala.concurrent.duration.Duration;
95 import scala.concurrent.duration.FiniteDuration;
96
97 public class RaftActorTest extends AbstractActorTest {
98
99     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
100
101     private TestActorFactory factory;
102
103     @Before
104     public void setUp() {
105         factory = new TestActorFactory(getSystem());
106     }
107
108     @After
109     public void tearDown() throws Exception {
110         factory.close();
111         InMemoryJournal.clear();
112         InMemorySnapshotStore.clear();
113     }
114
115     @Test
116     public void testConstruction() {
117         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
118     }
119
120     @Test
121     public void testFindLeaderWhenLeaderIsSelf() {
122         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
123         kit.waitUntilLeader();
124     }
125
126
127     @Test
128     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
129         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
130
131         JavaTestKit kit = new JavaTestKit(getSystem());
132         String persistenceId = factory.generateActorId("follower-");
133
134         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
135
136         // Set the heartbeat interval high to essentially disable election otherwise the test
137         // may fail if the actor is switched to Leader and the commitIndex is set to the last
138         // log entry.
139         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
140
141         ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder()
142                 .put("member1", "address").build();
143         ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
144                 peerAddresses, config), persistenceId);
145
146         kit.watch(followerActor);
147
148         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
149         ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
150                 new MockRaftActorContext.MockPayload("E"));
151         snapshotUnappliedEntries.add(entry1);
152
153         int lastAppliedDuringSnapshotCapture = 3;
154         int lastIndexDuringSnapshotCapture = 4;
155
156         // 4 messages as part of snapshot, which are applied to state
157         ByteString snapshotBytes = fromObject(Arrays.asList(
158                 new MockRaftActorContext.MockPayload("A"),
159                 new MockRaftActorContext.MockPayload("B"),
160                 new MockRaftActorContext.MockPayload("C"),
161                 new MockRaftActorContext.MockPayload("D")));
162
163         Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
164                 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
165                 lastAppliedDuringSnapshotCapture, 1);
166         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
167
168         // add more entries after snapshot is taken
169         List<ReplicatedLogEntry> entries = new ArrayList<>();
170         ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
171                 new MockRaftActorContext.MockPayload("F", 2));
172         ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
173                 new MockRaftActorContext.MockPayload("G", 3));
174         ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
175                 new MockRaftActorContext.MockPayload("H", 4));
176         entries.add(entry2);
177         entries.add(entry3);
178         entries.add(entry4);
179
180         final int lastAppliedToState = 5;
181         final int lastIndex = 7;
182
183         InMemoryJournal.addEntry(persistenceId, 5, entry2);
184         // 2 entries are applied to state besides the 4 entries in snapshot
185         InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
186         InMemoryJournal.addEntry(persistenceId, 7, entry3);
187         InMemoryJournal.addEntry(persistenceId, 8, entry4);
188
189         // kill the actor
190         followerActor.tell(PoisonPill.getInstance(), null);
191         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
192
193         kit.unwatch(followerActor);
194
195         //reinstate the actor
196         TestActorRef<MockRaftActor> ref = factory.createTestActor(
197                 MockRaftActor.props(persistenceId, peerAddresses, config));
198
199         MockRaftActor mockRaftActor = ref.underlyingActor();
200
201         mockRaftActor.waitForRecoveryComplete();
202
203         RaftActorContext context = mockRaftActor.getRaftActorContext();
204         assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
205                 context.getReplicatedLog().size());
206         assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
207         assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
208         assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
209         assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
210         assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
211
212         mockRaftActor.waitForInitializeBehaviorComplete();
213
214         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
215
216         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
217     }
218
219     @Test
220     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
221         String persistenceId = factory.generateActorId("follower-");
222
223         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
224
225         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
226
227         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
228                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
229                 config, new NonPersistentDataProvider()), persistenceId);
230
231         MockRaftActor mockRaftActor = ref.underlyingActor();
232
233         mockRaftActor.waitForRecoveryComplete();
234
235         mockRaftActor.waitForInitializeBehaviorComplete();
236
237         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
238     }
239
240     @Test
241     public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
242         final JavaTestKit kit = new JavaTestKit(getSystem());
243         String persistenceId = factory.generateActorId("follower-");
244         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
245         config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
246         config.setElectionTimeoutFactor(1);
247
248         InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
249
250         TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
251                 ImmutableMap.<String, String>builder().put("member1", "address").build(),
252                 config, new NonPersistentDataProvider())
253                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
254
255         InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
256         List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
257         assertEquals("UpdateElectionTerm entries", 1, entries.size());
258         final UpdateElectionTerm updateEntry = entries.get(0);
259
260         factory.killActor(ref, kit);
261
262         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
263         ref = factory.createTestActor(MockRaftActor.props(persistenceId,
264                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
265                 new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()),
266                 factory.generateActorId("follower-"));
267
268         MockRaftActor actor = ref.underlyingActor();
269         actor.waitForRecoveryComplete();
270
271         RaftActorContext newContext = actor.getRaftActorContext();
272         assertEquals("electionTerm", updateEntry.getCurrentTerm(),
273                 newContext.getTermInformation().getCurrentTerm());
274         assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
275
276         entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
277         assertEquals("UpdateElectionTerm entries", 1, entries.size());
278     }
279
280     @Test
281     public void testRaftActorForwardsToRaftActorRecoverySupport() {
282         String persistenceId = factory.generateActorId("leader-");
283
284         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
285
286         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
287
288         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
289                 Collections.<String, String>emptyMap(), config), persistenceId);
290
291         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
292
293         // Wait for akka's recovery to complete so it doesn't interfere.
294         mockRaftActor.waitForRecoveryComplete();
295
296         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
297         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
298
299         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
300         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
301         mockRaftActor.handleRecover(snapshotOffer);
302
303         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
304                 1, new MockRaftActorContext.MockPayload("1", 5));
305         mockRaftActor.handleRecover(logEntry);
306
307         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
308         mockRaftActor.handleRecover(applyJournalEntries);
309
310         DeleteEntries deleteEntries = new DeleteEntries(1);
311         mockRaftActor.handleRecover(deleteEntries);
312
313         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
314         mockRaftActor.handleRecover(updateElectionTerm);
315
316         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
317         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
318         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
319         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
320         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
321     }
322
323     @Test
324     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
325         String persistenceId = factory.generateActorId("leader-");
326
327         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
328
329         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
330
331         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
332
333         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
334                 .config(config).snapshotMessageSupport(mockSupport).props());
335
336         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
337
338         // Wait for akka's recovery to complete so it doesn't interfere.
339         mockRaftActor.waitForRecoveryComplete();
340
341         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
342         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
343         mockRaftActor.handleCommand(applySnapshot);
344
345         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
346         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
347         mockRaftActor.handleCommand(captureSnapshot);
348
349         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
350         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
351         mockRaftActor.handleCommand(captureSnapshotReply);
352
353         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
354         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
355         mockRaftActor.handleCommand(saveSnapshotSuccess);
356
357         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L),
358                 new Throwable());
359         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
360         mockRaftActor.handleCommand(saveSnapshotFailure);
361
362         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
363                 any(ActorRef.class));
364         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
365
366         doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
367         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
368
369         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
370         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
371         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
372         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
373         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
374         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
375                 any(ActorRef.class));
376         verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
377     }
378
379     @SuppressWarnings("unchecked")
380     @Test
381     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
382         String persistenceId = factory.generateActorId("leader-");
383
384         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
385
386         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
387
388         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
389
390         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
391                 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
392
393         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
394
395         mockRaftActor.waitForInitializeBehaviorComplete();
396
397         mockRaftActor.waitUntilLeader();
398
399         mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
400
401         verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
402     }
403
404     @Test
405     public void testApplyState() throws Exception {
406         String persistenceId = factory.generateActorId("leader-");
407
408         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
409
410         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
411
412         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
413
414         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
415                 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
416
417         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
418
419         mockRaftActor.waitForInitializeBehaviorComplete();
420
421         ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
422                 new MockRaftActorContext.MockPayload("F"));
423
424         final Identifier id = new MockIdentifier("apply-state");
425         mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
426
427         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
428     }
429
430     @Test
431     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
432         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
433                 Props.create(MessageCollectorActor.class));
434         MessageCollectorActor.waitUntilReady(notifierActor);
435
436         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
437         long heartBeatInterval = 100;
438         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
439         config.setElectionTimeoutFactor(20);
440
441         String persistenceId = factory.generateActorId("notifier-");
442
443         final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
444                 .id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
445                         new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
446                 persistenceId);
447
448         List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
449
450
451         // check if the notifier got a role change from null to Follower
452         RoleChanged raftRoleChanged = matches.get(0);
453         assertEquals(persistenceId, raftRoleChanged.getMemberId());
454         assertNull(raftRoleChanged.getOldRole());
455         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
456
457         // check if the notifier got a role change from Follower to Candidate
458         raftRoleChanged = matches.get(1);
459         assertEquals(persistenceId, raftRoleChanged.getMemberId());
460         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
461         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
462
463         // check if the notifier got a role change from Candidate to Leader
464         raftRoleChanged = matches.get(2);
465         assertEquals(persistenceId, raftRoleChanged.getMemberId());
466         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
467         assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
468
469         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
470                 notifierActor, LeaderStateChanged.class);
471
472         assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
473         assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
474
475         notifierActor.underlyingActor().clear();
476
477         MockRaftActor raftActor = raftActorRef.underlyingActor();
478         final String newLeaderId = "new-leader";
479         final short newLeaderVersion = 6;
480         Follower follower = new Follower(raftActor.getRaftActorContext()) {
481             @Override
482             public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
483                 setLeaderId(newLeaderId);
484                 setLeaderPayloadVersion(newLeaderVersion);
485                 return this;
486             }
487         };
488
489         raftActor.newBehavior(follower);
490
491         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
492         assertEquals(persistenceId, leaderStateChange.getMemberId());
493         assertEquals(null, leaderStateChange.getLeaderId());
494
495         raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
496         assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
497         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
498
499         notifierActor.underlyingActor().clear();
500
501         raftActor.handleCommand("any");
502
503         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
504         assertEquals(persistenceId, leaderStateChange.getMemberId());
505         assertEquals(newLeaderId, leaderStateChange.getLeaderId());
506         assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
507
508         notifierActor.underlyingActor().clear();
509
510         raftActor.handleCommand("any");
511
512         Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
513         leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
514         assertNull(leaderStateChange);
515     }
516
517     @Test
518     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
519         ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
520         MessageCollectorActor.waitUntilReady(notifierActor);
521
522         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
523         long heartBeatInterval = 100;
524         config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
525         config.setElectionTimeoutFactor(1);
526
527         String persistenceId = factory.generateActorId("notifier-");
528
529         factory.createActor(MockRaftActor.builder().id(persistenceId)
530                 .peerAddresses(ImmutableMap.of("leader", "fake/path"))
531                 .config(config).roleChangeNotifier(notifierActor).props());
532
533         List<RoleChanged> matches =  null;
534         for (int i = 0; i < 5000 / heartBeatInterval; i++) {
535             matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
536             assertNotNull(matches);
537             if (matches.size() == 3) {
538                 break;
539             }
540             Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
541         }
542
543         assertNotNull(matches);
544         assertEquals(2, matches.size());
545
546         // check if the notifier got a role change from null to Follower
547         RoleChanged raftRoleChanged = matches.get(0);
548         assertEquals(persistenceId, raftRoleChanged.getMemberId());
549         assertNull(raftRoleChanged.getOldRole());
550         assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
551
552         // check if the notifier got a role change from Follower to Candidate
553         raftRoleChanged = matches.get(1);
554         assertEquals(persistenceId, raftRoleChanged.getMemberId());
555         assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
556         assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
557     }
558
559     @Test
560     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
561         final String persistenceId = factory.generateActorId("leader-");
562         final String follower1Id = factory.generateActorId("follower-");
563
564         ActorRef followerActor1 =
565                 factory.createActor(Props.create(MessageCollectorActor.class));
566
567         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
568         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
569         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
570
571         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
572
573         Map<String, String> peerAddresses = new HashMap<>();
574         peerAddresses.put(follower1Id, followerActor1.path().toString());
575
576         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
577                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
578
579         MockRaftActor leaderActor = mockActorRef.underlyingActor();
580
581         leaderActor.getRaftActorContext().setCommitIndex(4);
582         leaderActor.getRaftActorContext().setLastApplied(4);
583         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
584
585         leaderActor.waitForInitializeBehaviorComplete();
586
587         // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
588
589         Leader leader = new Leader(leaderActor.getRaftActorContext());
590         leaderActor.setCurrentBehavior(leader);
591         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
592
593         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
594         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
595
596         assertEquals(8, leaderActor.getReplicatedLog().size());
597
598         leaderActor.getRaftActorContext().getSnapshotManager().capture(
599                 new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4);
600
601         verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
602
603         assertEquals(8, leaderActor.getReplicatedLog().size());
604
605         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
606         //fake snapshot on index 5
607         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
608
609         assertEquals(8, leaderActor.getReplicatedLog().size());
610
611         //fake snapshot on index 6
612         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
613         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
614         assertEquals(8, leaderActor.getReplicatedLog().size());
615
616         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
617
618         assertEquals(8, leaderActor.getReplicatedLog().size());
619
620         ByteString snapshotBytes = fromObject(Arrays.asList(
621                 new MockRaftActorContext.MockPayload("foo-0"),
622                 new MockRaftActorContext.MockPayload("foo-1"),
623                 new MockRaftActorContext.MockPayload("foo-2"),
624                 new MockRaftActorContext.MockPayload("foo-3"),
625                 new MockRaftActorContext.MockPayload("foo-4")));
626
627         leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
628                 Runtime.getRuntime().totalMemory());
629
630         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
631
632         // The commit is needed to complete the snapshot creation process
633         leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
634
635         // capture snapshot reply should remove the snapshotted entries only
636         assertEquals(3, leaderActor.getReplicatedLog().size());
637         assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
638
639         // add another non-replicated entry
640         leaderActor.getReplicatedLog().append(
641                 new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
642
643         //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
644         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
645         assertEquals(2, leaderActor.getReplicatedLog().size());
646         assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
647     }
648
649     @Test
650     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
651         final String persistenceId = factory.generateActorId("follower-");
652         final String leaderId = factory.generateActorId("leader-");
653
654
655         ActorRef leaderActor1 =
656                 factory.createActor(Props.create(MessageCollectorActor.class));
657
658         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
659         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
660         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
661
662         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
663
664         Map<String, String> peerAddresses = new HashMap<>();
665         peerAddresses.put(leaderId, leaderActor1.path().toString());
666
667         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
668                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
669
670         MockRaftActor followerActor = mockActorRef.underlyingActor();
671         followerActor.getRaftActorContext().setCommitIndex(4);
672         followerActor.getRaftActorContext().setLastApplied(4);
673         followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
674
675         followerActor.waitForInitializeBehaviorComplete();
676
677
678         Follower follower = new Follower(followerActor.getRaftActorContext());
679         followerActor.setCurrentBehavior(follower);
680         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
681
682         // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
683         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
684         followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
685
686         // log has indices 0-5
687         assertEquals(6, followerActor.getReplicatedLog().size());
688
689         //snapshot on 4
690         followerActor.getRaftActorContext().getSnapshotManager().capture(
691                 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
692                         new MockRaftActorContext.MockPayload("D")), 4);
693
694         verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
695
696         assertEquals(6, followerActor.getReplicatedLog().size());
697
698         //fake snapshot on index 6
699         List<ReplicatedLogEntry> entries =
700                 Arrays.asList(
701                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
702                                 new MockRaftActorContext.MockPayload("foo-6"))
703                         );
704         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
705         assertEquals(7, followerActor.getReplicatedLog().size());
706
707         //fake snapshot on index 7
708         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
709
710         entries =
711                 Arrays.asList(
712                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
713                                 new MockRaftActorContext.MockPayload("foo-7"))
714                         );
715         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
716         assertEquals(8, followerActor.getReplicatedLog().size());
717
718         assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
719
720
721         ByteString snapshotBytes = fromObject(Arrays.asList(
722                 new MockRaftActorContext.MockPayload("foo-0"),
723                 new MockRaftActorContext.MockPayload("foo-1"),
724                 new MockRaftActorContext.MockPayload("foo-2"),
725                 new MockRaftActorContext.MockPayload("foo-3"),
726                 new MockRaftActorContext.MockPayload("foo-4")));
727         followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
728         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
729
730         // The commit is needed to complete the snapshot creation process
731         followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
732
733         // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
734         assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
735         assertEquals(7, followerActor.getReplicatedLog().lastIndex());
736
737         entries =
738                 Arrays.asList(
739                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
740                                 new MockRaftActorContext.MockPayload("foo-7"))
741                         );
742         // send an additional entry 8 with leaderCommit = 7
743         followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
744
745         // 7 and 8, as lastapplied is 7
746         assertEquals(2, followerActor.getReplicatedLog().size());
747     }
748
749     @Test
750     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
751         final String persistenceId = factory.generateActorId("leader-");
752         final String follower1Id = factory.generateActorId("follower-");
753         final String follower2Id = factory.generateActorId("follower-");
754
755         final ActorRef followerActor1 = factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
756         final ActorRef followerActor2 = factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
757
758         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
759         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
761
762         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
763
764         Map<String, String> peerAddresses = new HashMap<>();
765         peerAddresses.put(follower1Id, followerActor1.path().toString());
766         peerAddresses.put(follower2Id, followerActor2.path().toString());
767
768         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
769                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
770
771         MockRaftActor leaderActor = mockActorRef.underlyingActor();
772         leaderActor.getRaftActorContext().setCommitIndex(9);
773         leaderActor.getRaftActorContext().setLastApplied(9);
774         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
775
776         leaderActor.waitForInitializeBehaviorComplete();
777
778         Leader leader = new Leader(leaderActor.getRaftActorContext());
779         leaderActor.setCurrentBehavior(leader);
780         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
781
782         // create 5 entries in the log
783         MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
784         leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
785
786         //set the snapshot index to 4 , 0 to 4 are snapshotted
787         leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
788         //setting replicatedToAllIndex = 9, for the log to clear
789         leader.setReplicatedToAllIndex(9);
790         assertEquals(5, leaderActor.getReplicatedLog().size());
791         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
792
793         leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
794         assertEquals(5, leaderActor.getReplicatedLog().size());
795         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
796
797         // set the 2nd follower nextIndex to 1 which has been snapshotted
798         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
799         assertEquals(5, leaderActor.getReplicatedLog().size());
800         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
801
802         // simulate a real snapshot
803         leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
804         assertEquals(5, leaderActor.getReplicatedLog().size());
805         assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
806                 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId()),
807                 RaftState.Leader, leaderActor.getCurrentBehavior().state());
808
809
810         //reply from a slow follower does not initiate a fake snapshot
811         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
812         assertEquals("Fake snapshot should not happen when Initiate is in progress", 5,
813                 leaderActor.getReplicatedLog().size());
814
815         ByteString snapshotBytes = fromObject(Arrays.asList(
816                 new MockRaftActorContext.MockPayload("foo-0"),
817                 new MockRaftActorContext.MockPayload("foo-1"),
818                 new MockRaftActorContext.MockPayload("foo-2"),
819                 new MockRaftActorContext.MockPayload("foo-3"),
820                 new MockRaftActorContext.MockPayload("foo-4")));
821         leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
822         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
823
824         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
825                 leaderActor.getReplicatedLog().size());
826
827         //reply from a slow follower after should not raise errors
828         leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
829         assertEquals(0, leaderActor.getReplicatedLog().size());
830     }
831
832     @Test
833     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
834         String persistenceId = factory.generateActorId("leader-");
835         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
836         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
837         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
838         config.setSnapshotBatchCount(5);
839
840         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
841
842         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
843
844         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
845                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
846
847         MockRaftActor leaderActor = mockActorRef.underlyingActor();
848         leaderActor.getRaftActorContext().setCommitIndex(3);
849         leaderActor.getRaftActorContext().setLastApplied(3);
850         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
851
852         leaderActor.waitForInitializeBehaviorComplete();
853         for (int i = 0; i < 4; i++) {
854             leaderActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
855                     new MockRaftActorContext.MockPayload("A")));
856         }
857
858         Leader leader = new Leader(leaderActor.getRaftActorContext());
859         leaderActor.setCurrentBehavior(leader);
860         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
861
862         // Simulate an install snaphost to a follower.
863         leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
864                 leaderActor.getReplicatedLog().last(), -1, "member1");
865
866         // Now send a CaptureSnapshotReply
867         mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
868
869         // Trimming log in this scenario is a no-op
870         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
871         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
872         assertEquals(-1, leader.getReplicatedToAllIndex());
873     }
874
875     @Test
876     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
877         String persistenceId = factory.generateActorId("leader-");
878         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
879         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
880         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
881         config.setSnapshotBatchCount(5);
882
883         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
884
885         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
886
887         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
888                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
889
890         MockRaftActor leaderActor = mockActorRef.underlyingActor();
891         leaderActor.getRaftActorContext().setCommitIndex(3);
892         leaderActor.getRaftActorContext().setLastApplied(3);
893         leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
894         leaderActor.getReplicatedLog().setSnapshotIndex(3);
895
896         leaderActor.waitForInitializeBehaviorComplete();
897         Leader leader = new Leader(leaderActor.getRaftActorContext());
898         leaderActor.setCurrentBehavior(leader);
899         leader.setReplicatedToAllIndex(3);
900         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
901
902         // Persist another entry (this will cause a CaptureSnapshot to be triggered
903         leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
904                 new MockRaftActorContext.MockPayload("duh"));
905
906         // Now send a CaptureSnapshotReply
907         mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
908
909         // Trimming log in this scenario is a no-op
910         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
911         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
912         assertEquals(3, leader.getReplicatedToAllIndex());
913     }
914
915     @Test
916     public void testSwitchBehavior() {
917         String persistenceId = factory.generateActorId("leader-");
918         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
919         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
920         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
921         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
922         config.setSnapshotBatchCount(5);
923
924         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
925
926         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
927
928         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
929                 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
930
931         MockRaftActor leaderActor = mockActorRef.underlyingActor();
932
933         leaderActor.waitForRecoveryComplete();
934
935         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
936
937         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
938         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
939
940         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
941
942         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
943         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
944
945         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
946
947         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
948         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
949
950         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
951
952         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
953         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
954     }
955
956     public static ByteString fromObject(Object snapshot) throws Exception {
957         ByteArrayOutputStream bos = null;
958         ObjectOutputStream os = null;
959         try {
960             bos = new ByteArrayOutputStream();
961             os = new ObjectOutputStream(bos);
962             os.writeObject(snapshot);
963             byte[] snapshotBytes = bos.toByteArray();
964             return ByteString.copyFrom(snapshotBytes);
965         } finally {
966             if (os != null) {
967                 os.flush();
968                 os.close();
969             }
970             if (bos != null) {
971                 bos.close();
972             }
973         }
974     }
975
976     @Test
977     public void testUpdateConfigParam() throws Exception {
978         DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
979         String persistenceId = factory.generateActorId("follower-");
980         ImmutableMap<String, String> peerAddresses =
981             ImmutableMap.<String, String>builder().put("member1", "address").build();
982         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
983
984         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
985                 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
986         MockRaftActor mockRaftActor = actorRef.underlyingActor();
987         mockRaftActor.waitForInitializeBehaviorComplete();
988
989         RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
990         mockRaftActor.updateConfigParams(emptyConfig);
991         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
992         assertEquals("Behavior State", RaftState.Follower,
993             mockRaftActor.getCurrentBehavior().state());
994
995         DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
996         disableConfig.setCustomRaftPolicyImplementationClass(
997             "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
998         mockRaftActor.updateConfigParams(disableConfig);
999         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1000         assertEquals("Behavior State", RaftState.Follower,
1001             mockRaftActor.getCurrentBehavior().state());
1002
1003         behavior = mockRaftActor.getCurrentBehavior();
1004         mockRaftActor.updateConfigParams(disableConfig);
1005         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1006         assertEquals("Behavior State", RaftState.Follower,
1007             mockRaftActor.getCurrentBehavior().state());
1008
1009         DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
1010         defaultConfig.setCustomRaftPolicyImplementationClass(
1011             "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
1012         mockRaftActor.updateConfigParams(defaultConfig);
1013         assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1014         assertEquals("Behavior State", RaftState.Follower,
1015             mockRaftActor.getCurrentBehavior().state());
1016
1017         behavior = mockRaftActor.getCurrentBehavior();
1018         mockRaftActor.updateConfigParams(defaultConfig);
1019         assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1020         assertEquals("Behavior State", RaftState.Follower,
1021             mockRaftActor.getCurrentBehavior().state());
1022     }
1023
1024     @Test
1025     public void testGetSnapshot() throws Exception {
1026         TEST_LOG.info("testGetSnapshot starting");
1027
1028         final JavaTestKit kit = new JavaTestKit(getSystem());
1029
1030         String persistenceId = factory.generateActorId("test-actor-");
1031         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1032         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1033
1034         long term = 3;
1035         long seqN = 1;
1036         InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1037         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
1038                 new MockRaftActorContext.MockPayload("A")));
1039         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
1040                 new MockRaftActorContext.MockPayload("B")));
1041         InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1042         InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
1043                 new MockRaftActorContext.MockPayload("C")));
1044
1045         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1046                 ImmutableMap.<String, String>builder().put("member1", "address").build(), config)
1047                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1048         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1049
1050         mockRaftActor.waitForRecoveryComplete();
1051
1052         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1053
1054         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1055
1056         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1057         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
1058
1059         byte[] stateSnapshot = new byte[]{1,2,3};
1060         replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
1061
1062         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1063
1064         assertEquals("getId", persistenceId, reply.getId());
1065         Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1066         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1067         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1068         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1069         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1070         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1071         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1072         assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
1073         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1074         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1075
1076         // Test with timeout
1077
1078         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
1079                 Duration.create(200, TimeUnit.MILLISECONDS));
1080         reset(mockRaftActor.snapshotCohortDelegate);
1081
1082         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1083         Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1084         assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1085
1086         mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1087
1088         // Test with persistence disabled.
1089
1090         mockRaftActor.setPersistence(false);
1091         reset(mockRaftActor.snapshotCohortDelegate);
1092
1093         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1094         reply = kit.expectMsgClass(GetSnapshotReply.class);
1095         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
1096
1097         assertEquals("getId", persistenceId, reply.getId());
1098         replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1099         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1100         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1101         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1102         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1103         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1104         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1105         assertEquals("getState length", 0, replySnapshot.getState().length);
1106         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1107
1108         TEST_LOG.info("testGetSnapshot ending");
1109     }
1110
1111     @Test
1112     public void testRestoreFromSnapshot() throws Exception {
1113         TEST_LOG.info("testRestoreFromSnapshot starting");
1114
1115         String persistenceId = factory.generateActorId("test-actor-");
1116         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1117         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1118
1119         List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
1120         snapshotUnappliedEntries.add(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")));
1121
1122         int snapshotLastApplied = 3;
1123         int snapshotLastIndex = 4;
1124
1125         List<MockPayload> state = Arrays.asList(
1126                 new MockRaftActorContext.MockPayload("A"),
1127                 new MockRaftActorContext.MockPayload("B"),
1128                 new MockRaftActorContext.MockPayload("C"),
1129                 new MockRaftActorContext.MockPayload("D"));
1130         ByteString stateBytes = fromObject(state);
1131
1132         Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
1133                 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
1134
1135         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1136
1137         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1138                 .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
1139                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1140         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1141
1142         mockRaftActor.waitForRecoveryComplete();
1143
1144         Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1145         assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1146         assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1147         assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1148         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1149         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1150         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1151         assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
1152         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1153
1154         verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
1155
1156         RaftActorContext context = mockRaftActor.getRaftActorContext();
1157         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1158         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1159         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1160         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1161         assertEquals("Recovered state", state, mockRaftActor.getState());
1162         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1163         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1164
1165         // Test with data persistence disabled
1166
1167         snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
1168                 -1, -1, -1, -1, 5, "member-1");
1169
1170         persistenceId = factory.generateActorId("test-actor-");
1171
1172         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1173                 .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot))
1174                 .persistent(Optional.of(Boolean.FALSE)).props()
1175                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1176         mockRaftActor = raftActorRef.underlyingActor();
1177
1178         mockRaftActor.waitForRecoveryComplete();
1179         assertEquals("snapshot committed", true,
1180                 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1181
1182         context = mockRaftActor.getRaftActorContext();
1183         assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1184         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1185
1186         TEST_LOG.info("testRestoreFromSnapshot ending");
1187     }
1188
1189     @Test
1190     public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1191         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1192
1193         String persistenceId = factory.generateActorId("test-actor-");
1194         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1195         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1196
1197         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
1198         Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
1199                 5, 2, 5, 2, 2, "member-1");
1200
1201         InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1202                 new MockRaftActorContext.MockPayload("B")));
1203
1204         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1205                 .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
1206                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1207         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1208
1209         mockRaftActor.waitForRecoveryComplete();
1210
1211         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1212         verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
1213
1214         RaftActorContext context = mockRaftActor.getRaftActorContext();
1215         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1216         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1217         assertEquals("Last applied", -1, context.getLastApplied());
1218         assertEquals("Commit index", -1, context.getCommitIndex());
1219         assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1220         assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1221
1222         TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1223     }
1224
1225     @Test
1226     public void testNonVotingOnRecovery() throws Exception {
1227         TEST_LOG.info("testNonVotingOnRecovery starting");
1228
1229         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1230         config.setElectionTimeoutFactor(1);
1231         config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1232
1233         String persistenceId = factory.generateActorId("test-actor-");
1234         InMemoryJournal.addEntry(persistenceId, 1,  new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1235                 new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
1236
1237         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1238                 .config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1239         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1240
1241         mockRaftActor.waitForInitializeBehaviorComplete();
1242
1243         // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1244
1245         Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1246         assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1247
1248         TEST_LOG.info("testNonVotingOnRecovery ending");
1249     }
1250
1251     @Test
1252     public void testLeaderTransitioning() throws Exception {
1253         TEST_LOG.info("testLeaderTransitioning starting");
1254
1255         TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
1256                 Props.create(MessageCollectorActor.class));
1257
1258         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1259         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1260
1261         String persistenceId = factory.generateActorId("test-actor-");
1262
1263         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
1264                 .config(config).roleChangeNotifier(notifierActor).props()
1265                 .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1266         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1267
1268         mockRaftActor.waitForInitializeBehaviorComplete();
1269
1270         raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
1271                 0L, -1L, (short)1), ActorRef.noSender());
1272         LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1273                 notifierActor, LeaderStateChanged.class);
1274         assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1275
1276         MessageCollectorActor.clearMessages(notifierActor);
1277
1278         raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
1279
1280         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1281         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1282         assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1283
1284         TEST_LOG.info("testLeaderTransitioning ending");
1285     }
1286
1287     @SuppressWarnings({ "unchecked", "rawtypes" })
1288     @Test
1289     public void testReplicateWithPersistencePending() throws Exception {
1290         final String leaderId = factory.generateActorId("leader-");
1291         final String followerId = factory.generateActorId("follower-");
1292
1293         final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
1294
1295         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1296         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1297         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1298
1299         DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
1300         doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
1301
1302         TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
1303                 MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
1304                         mockPersistenceProvider), leaderId);
1305         MockRaftActor leaderActor = leaderActorRef.underlyingActor();
1306         leaderActor.waitForInitializeBehaviorComplete();
1307
1308         leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
1309
1310         Leader leader = new Leader(leaderActor.getRaftActorContext());
1311         leaderActor.setCurrentBehavior(leader);
1312
1313         leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
1314
1315         ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
1316         assertNotNull("ReplicatedLogEntry not found", logEntry);
1317         assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
1318         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1319
1320         leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
1321         assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
1322
1323         ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
1324         verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
1325
1326         callbackCaptor.getValue().apply(logEntry);
1327
1328         assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
1329         assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
1330     }
1331 }