1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
48 public class LeaderTest extends AbstractLeaderTest {
50 static final String FOLLOWER_ID = "follower";
52 private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55 private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56 Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58 private Leader leader;
62 public void tearDown() throws Exception {
71 public void testHandleMessageForUnknownMessage() throws Exception {
72 logStart("testHandleMessageForUnknownMessage");
74 leader = new Leader(createActorContext());
76 // handle message should return the Leader state when it receives an
78 RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79 Assert.assertTrue(behavior instanceof Leader);
83 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84 logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86 MockRaftActorContext actorContext = createActorContextWithFollower();
89 actorContext.getTermInformation().update(term, "");
91 leader = new Leader(actorContext);
93 // Leader should send an immediate heartbeat with no entries as follower is inactive.
94 long lastIndex = actorContext.getReplicatedLog().lastIndex();
95 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96 assertEquals("getTerm", term, appendEntries.getTerm());
97 assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98 assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99 assertEquals("Entries size", 0, appendEntries.getEntries().size());
101 // The follower would normally reply - simulate that explicitly here.
102 leader.handleMessage(followerActor, new AppendEntriesReply(
103 FOLLOWER_ID, term, true, lastIndex - 1, term));
104 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106 followerActor.underlyingActor().clear();
108 // Sleep for the heartbeat interval so AppendEntries is sent.
109 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112 leader.handleMessage(leaderActor, new SendHeartBeat());
114 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115 assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117 assertEquals("Entries size", 1, appendEntries.getEntries().size());
118 assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
123 public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
124 logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
126 MockRaftActorContext actorContext = createActorContextWithFollower();
129 actorContext.getTermInformation().update(term, "");
131 leader = new Leader(actorContext);
133 // Leader will send an immediate heartbeat - ignore it.
134 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
136 // The follower would normally reply - simulate that explicitly here.
137 long lastIndex = actorContext.getReplicatedLog().lastIndex();
138 leader.handleMessage(followerActor, new AppendEntriesReply(
139 FOLLOWER_ID, term, true, lastIndex, term));
140 assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
142 followerActor.underlyingActor().clear();
144 MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
145 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
146 1, lastIndex + 1, payload);
147 actorContext.getReplicatedLog().append(newEntry);
148 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
149 new Replicate(null, null, newEntry));
151 // State should not change
152 assertTrue(raftBehavior instanceof Leader);
154 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
155 assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
156 assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
157 assertEquals("Entries size", 1, appendEntries.getEntries().size());
158 assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
159 assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
160 assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
164 public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
165 logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
167 MockRaftActorContext actorContext = createActorContext();
169 leader = new Leader(actorContext);
171 actorContext.setLastApplied(0);
173 long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
174 long term = actorContext.getTermInformation().getCurrentTerm();
175 MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
176 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
178 actorContext.getReplicatedLog().append(newEntry);
180 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
181 new Replicate(leaderActor, "state-id", newEntry));
183 // State should not change
184 assertTrue(raftBehavior instanceof Leader);
186 assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
188 // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
189 // one since lastApplied state is 0.
190 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
191 leaderActor, ApplyState.class);
192 assertEquals("ApplyState count", newLogIndex, applyStateList.size());
194 for(int i = 0; i <= newLogIndex - 1; i++ ) {
195 ApplyState applyState = applyStateList.get(i);
196 assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
197 assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
200 ApplyState last = applyStateList.get((int) newLogIndex - 1);
201 assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
202 assertEquals("getIdentifier", "state-id", last.getIdentifier());
206 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
207 logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
209 MockRaftActorContext actorContext = createActorContextWithFollower();
211 Map<String, String> leadersSnapshot = new HashMap<>();
212 leadersSnapshot.put("1", "A");
213 leadersSnapshot.put("2", "B");
214 leadersSnapshot.put("3", "C");
217 actorContext.getReplicatedLog().removeFrom(0);
219 final int followersLastIndex = 2;
220 final int snapshotIndex = 3;
221 final int newEntryIndex = 4;
222 final int snapshotTerm = 1;
223 final int currentTerm = 2;
225 // set the snapshot variables in replicatedlog
226 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
227 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
228 actorContext.setCommitIndex(followersLastIndex);
229 //set follower timeout to 2 mins, helps during debugging
230 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
232 leader = new Leader(actorContext);
235 ReplicatedLogImplEntry entry =
236 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
237 new MockRaftActorContext.MockPayload("D"));
239 //update follower timestamp
240 leader.markFollowerActive(FOLLOWER_ID);
242 ByteString bs = toByteString(leadersSnapshot);
243 leader.setSnapshot(Optional.of(bs));
244 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
245 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
247 //send first chunk and no InstallSnapshotReply received yet
249 fts.incrementChunkIndex();
251 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
252 TimeUnit.MILLISECONDS);
254 leader.handleMessage(leaderActor, new SendHeartBeat());
256 AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
258 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
260 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
262 //InstallSnapshotReply received
263 fts.markSendStatus(true);
265 leader.handleMessage(leaderActor, new SendHeartBeat());
267 InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
269 assertEquals(snapshotIndex, is.getLastIncludedIndex());
273 public void testSendAppendEntriesSnapshotScenario() throws Exception {
274 logStart("testSendAppendEntriesSnapshotScenario");
276 MockRaftActorContext actorContext = createActorContextWithFollower();
278 Map<String, String> leadersSnapshot = new HashMap<>();
279 leadersSnapshot.put("1", "A");
280 leadersSnapshot.put("2", "B");
281 leadersSnapshot.put("3", "C");
284 actorContext.getReplicatedLog().removeFrom(0);
286 final int followersLastIndex = 2;
287 final int snapshotIndex = 3;
288 final int newEntryIndex = 4;
289 final int snapshotTerm = 1;
290 final int currentTerm = 2;
292 // set the snapshot variables in replicatedlog
293 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
294 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
295 actorContext.setCommitIndex(followersLastIndex);
297 leader = new Leader(actorContext);
299 // Leader will send an immediate heartbeat - ignore it.
300 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
303 ReplicatedLogImplEntry entry =
304 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
305 new MockRaftActorContext.MockPayload("D"));
307 //update follower timestamp
308 leader.markFollowerActive(FOLLOWER_ID);
310 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
311 RaftActorBehavior raftBehavior = leader.handleMessage(
312 leaderActor, new Replicate(null, "state-id", entry));
314 assertTrue(raftBehavior instanceof Leader);
316 MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
320 public void testInitiateInstallSnapshot() throws Exception {
321 logStart("testInitiateInstallSnapshot");
323 MockRaftActorContext actorContext = createActorContextWithFollower();
325 Map<String, String> leadersSnapshot = new HashMap<>();
326 leadersSnapshot.put("1", "A");
327 leadersSnapshot.put("2", "B");
328 leadersSnapshot.put("3", "C");
331 actorContext.getReplicatedLog().removeFrom(0);
333 final int followersLastIndex = 2;
334 final int snapshotIndex = 3;
335 final int newEntryIndex = 4;
336 final int snapshotTerm = 1;
337 final int currentTerm = 2;
339 // set the snapshot variables in replicatedlog
340 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
341 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
342 actorContext.setLastApplied(3);
343 actorContext.setCommitIndex(followersLastIndex);
345 leader = new Leader(actorContext);
347 // Leader will send an immediate heartbeat - ignore it.
348 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350 // set the snapshot as absent and check if capture-snapshot is invoked.
351 leader.setSnapshot(Optional.<ByteString>absent());
354 ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
355 new MockRaftActorContext.MockPayload("D"));
357 actorContext.getReplicatedLog().append(entry);
359 //update follower timestamp
360 leader.markFollowerActive(FOLLOWER_ID);
362 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
364 CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
366 assertTrue(cs.isInstallSnapshotInitiated());
367 assertEquals(3, cs.getLastAppliedIndex());
368 assertEquals(1, cs.getLastAppliedTerm());
369 assertEquals(4, cs.getLastIndex());
370 assertEquals(2, cs.getLastTerm());
372 // if an initiate is started again when first is in progress, it shouldnt initiate Capture
373 leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
375 List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
376 assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
380 public void testInstallSnapshot() throws Exception {
381 logStart("testInstallSnapshot");
383 MockRaftActorContext actorContext = createActorContextWithFollower();
385 Map<String, String> leadersSnapshot = new HashMap<>();
386 leadersSnapshot.put("1", "A");
387 leadersSnapshot.put("2", "B");
388 leadersSnapshot.put("3", "C");
391 actorContext.getReplicatedLog().removeFrom(0);
393 final int followersLastIndex = 2;
394 final int snapshotIndex = 3;
395 final int snapshotTerm = 1;
396 final int currentTerm = 2;
398 // set the snapshot variables in replicatedlog
399 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
400 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
401 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
402 actorContext.setCommitIndex(followersLastIndex);
404 leader = new Leader(actorContext);
406 // Ignore initial heartbeat.
407 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
409 RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
410 new SendInstallSnapshot(toByteString(leadersSnapshot)));
412 assertTrue(raftBehavior instanceof Leader);
414 // check if installsnapshot gets called with the correct values.
416 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
418 assertNotNull(installSnapshot.getData());
419 assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
420 assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
422 assertEquals(currentTerm, installSnapshot.getTerm());
426 public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
427 logStart("testHandleInstallSnapshotReplyLastChunk");
429 MockRaftActorContext actorContext = createActorContextWithFollower();
431 final int followersLastIndex = 2;
432 final int snapshotIndex = 3;
433 final int snapshotTerm = 1;
434 final int currentTerm = 2;
436 actorContext.setCommitIndex(followersLastIndex);
438 leader = new Leader(actorContext);
440 // Ignore initial heartbeat.
441 MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
443 Map<String, String> leadersSnapshot = new HashMap<>();
444 leadersSnapshot.put("1", "A");
445 leadersSnapshot.put("2", "B");
446 leadersSnapshot.put("3", "C");
448 // set the snapshot variables in replicatedlog
450 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
451 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
452 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
454 ByteString bs = toByteString(leadersSnapshot);
455 leader.setSnapshot(Optional.of(bs));
456 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
457 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
458 while(!fts.isLastChunk(fts.getChunkIndex())) {
460 fts.incrementChunkIndex();
464 actorContext.getReplicatedLog().removeFrom(0);
466 RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
467 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
469 assertTrue(raftBehavior instanceof Leader);
471 assertEquals(0, leader.followerSnapshotSize());
472 assertEquals(1, leader.followerLogSize());
473 FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
475 assertEquals(snapshotIndex, fli.getMatchIndex());
476 assertEquals(snapshotIndex, fli.getMatchIndex());
477 assertEquals(snapshotIndex + 1, fli.getNextIndex());
481 public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
482 logStart("testSendSnapshotfromInstallSnapshotReply");
484 MockRaftActorContext actorContext = createActorContextWithFollower();
486 final int followersLastIndex = 2;
487 final int snapshotIndex = 3;
488 final int snapshotTerm = 1;
489 final int currentTerm = 2;
491 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
493 public int getSnapshotChunkSize() {
497 configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
498 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
500 actorContext.setConfigParams(configParams);
501 actorContext.setCommitIndex(followersLastIndex);
503 leader = new Leader(actorContext);
505 Map<String, String> leadersSnapshot = new HashMap<>();
506 leadersSnapshot.put("1", "A");
507 leadersSnapshot.put("2", "B");
508 leadersSnapshot.put("3", "C");
510 // set the snapshot variables in replicatedlog
511 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
512 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
513 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
515 ByteString bs = toByteString(leadersSnapshot);
516 leader.setSnapshot(Optional.of(bs));
518 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
520 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
522 assertEquals(1, installSnapshot.getChunkIndex());
523 assertEquals(3, installSnapshot.getTotalChunks());
525 followerActor.underlyingActor().clear();
526 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
527 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
529 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
531 assertEquals(2, installSnapshot.getChunkIndex());
532 assertEquals(3, installSnapshot.getTotalChunks());
534 followerActor.underlyingActor().clear();
535 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
536 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
538 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
540 // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
541 followerActor.underlyingActor().clear();
542 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
543 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
545 installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
547 Assert.assertNull(installSnapshot);
552 public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
553 logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
555 MockRaftActorContext actorContext = createActorContextWithFollower();
557 final int followersLastIndex = 2;
558 final int snapshotIndex = 3;
559 final int snapshotTerm = 1;
560 final int currentTerm = 2;
562 actorContext.setConfigParams(new DefaultConfigParamsImpl(){
564 public int getSnapshotChunkSize() {
569 actorContext.setCommitIndex(followersLastIndex);
571 leader = new Leader(actorContext);
573 Map<String, String> leadersSnapshot = new HashMap<>();
574 leadersSnapshot.put("1", "A");
575 leadersSnapshot.put("2", "B");
576 leadersSnapshot.put("3", "C");
578 // set the snapshot variables in replicatedlog
579 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
580 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
581 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
583 ByteString bs = toByteString(leadersSnapshot);
584 leader.setSnapshot(Optional.of(bs));
586 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
587 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
589 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
591 assertEquals(1, installSnapshot.getChunkIndex());
592 assertEquals(3, installSnapshot.getTotalChunks());
594 followerActor.underlyingActor().clear();
596 leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
597 FOLLOWER_ID, -1, false));
599 Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
600 TimeUnit.MILLISECONDS);
602 leader.handleMessage(leaderActor, new SendHeartBeat());
604 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
606 assertEquals(1, installSnapshot.getChunkIndex());
607 assertEquals(3, installSnapshot.getTotalChunks());
611 public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
612 logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
614 MockRaftActorContext actorContext = createActorContextWithFollower();
616 final int followersLastIndex = 2;
617 final int snapshotIndex = 3;
618 final int snapshotTerm = 1;
619 final int currentTerm = 2;
621 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
623 public int getSnapshotChunkSize() {
628 actorContext.setCommitIndex(followersLastIndex);
630 leader = new Leader(actorContext);
632 Map<String, String> leadersSnapshot = new HashMap<>();
633 leadersSnapshot.put("1", "A");
634 leadersSnapshot.put("2", "B");
635 leadersSnapshot.put("3", "C");
637 // set the snapshot variables in replicatedlog
638 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
642 ByteString bs = toByteString(leadersSnapshot);
643 leader.setSnapshot(Optional.of(bs));
645 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
647 InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
649 assertEquals(1, installSnapshot.getChunkIndex());
650 assertEquals(3, installSnapshot.getTotalChunks());
651 assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
653 int hashCode = installSnapshot.getData().hashCode();
655 followerActor.underlyingActor().clear();
657 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
658 FOLLOWER_ID, 1, true));
660 installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
662 assertEquals(2, installSnapshot.getChunkIndex());
663 assertEquals(3, installSnapshot.getTotalChunks());
664 assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
668 public void testFollowerToSnapshotLogic() {
669 logStart("testFollowerToSnapshotLogic");
671 MockRaftActorContext actorContext = createActorContext();
673 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
675 public int getSnapshotChunkSize() {
680 leader = new Leader(actorContext);
682 Map<String, String> leadersSnapshot = new HashMap<>();
683 leadersSnapshot.put("1", "A");
684 leadersSnapshot.put("2", "B");
685 leadersSnapshot.put("3", "C");
687 ByteString bs = toByteString(leadersSnapshot);
688 byte[] barray = bs.toByteArray();
690 FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
691 leader.setFollowerSnapshot(FOLLOWER_ID, fts);
693 assertEquals(bs.size(), barray.length);
696 for (int i=0; i < barray.length; i = i + 50) {
700 if (i + 50 > barray.length) {
704 ByteString chunk = fts.getNextChunk();
705 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
706 assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
708 fts.markSendStatus(true);
709 if (!fts.isLastChunk(chunkIndex)) {
710 fts.incrementChunkIndex();
714 assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
717 @Override protected RaftActorBehavior createBehavior(
718 RaftActorContext actorContext) {
719 return new Leader(actorContext);
723 protected MockRaftActorContext createActorContext() {
724 return createActorContext(leaderActor);
728 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
729 return createActorContext("leader", actorRef);
732 private MockRaftActorContext createActorContextWithFollower() {
733 MockRaftActorContext actorContext = createActorContext();
734 actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
735 followerActor.path().toString()).build());
739 private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
740 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
741 configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
742 configParams.setElectionTimeoutFactor(100000);
743 MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
744 context.setConfigParams(configParams);
749 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
750 logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
752 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
754 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
756 Follower follower = new Follower(followerActorContext);
757 followerActor.underlyingActor().setBehavior(follower);
759 Map<String, String> peerAddresses = new HashMap<>();
760 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
762 leaderActorContext.setPeerAddresses(peerAddresses);
764 leaderActorContext.getReplicatedLog().removeFrom(0);
767 leaderActorContext.setReplicatedLog(
768 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
770 leaderActorContext.setCommitIndex(1);
772 followerActorContext.getReplicatedLog().removeFrom(0);
774 // follower too has the exact same log entries and has the same commit index
775 followerActorContext.setReplicatedLog(
776 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
778 followerActorContext.setCommitIndex(1);
780 leader = new Leader(leaderActorContext);
782 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
784 assertEquals(1, appendEntries.getLeaderCommit());
785 assertEquals(0, appendEntries.getEntries().size());
786 assertEquals(0, appendEntries.getPrevLogIndex());
788 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
789 leaderActor, AppendEntriesReply.class);
791 assertEquals(2, appendEntriesReply.getLogLastIndex());
792 assertEquals(1, appendEntriesReply.getLogLastTerm());
794 // follower returns its next index
795 assertEquals(2, appendEntriesReply.getLogLastIndex());
796 assertEquals(1, appendEntriesReply.getLogLastTerm());
802 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
803 logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
805 MockRaftActorContext leaderActorContext = createActorContext();
807 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
809 Follower follower = new Follower(followerActorContext);
810 followerActor.underlyingActor().setBehavior(follower);
812 Map<String, String> peerAddresses = new HashMap<>();
813 peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
815 leaderActorContext.setPeerAddresses(peerAddresses);
817 leaderActorContext.getReplicatedLog().removeFrom(0);
819 leaderActorContext.setReplicatedLog(
820 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
822 leaderActorContext.setCommitIndex(1);
824 followerActorContext.getReplicatedLog().removeFrom(0);
826 followerActorContext.setReplicatedLog(
827 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
829 // follower has the same log entries but its commit index > leaders commit index
830 followerActorContext.setCommitIndex(2);
832 leader = new Leader(leaderActorContext);
835 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
837 assertEquals(1, appendEntries.getLeaderCommit());
838 assertEquals(0, appendEntries.getEntries().size());
839 assertEquals(0, appendEntries.getPrevLogIndex());
841 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
842 leaderActor, AppendEntriesReply.class);
844 assertEquals(2, appendEntriesReply.getLogLastIndex());
845 assertEquals(1, appendEntriesReply.getLogLastTerm());
847 leaderActor.underlyingActor().setBehavior(follower);
848 leader.handleMessage(followerActor, appendEntriesReply);
850 leaderActor.underlyingActor().clear();
851 followerActor.underlyingActor().clear();
853 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
854 TimeUnit.MILLISECONDS);
856 leader.handleMessage(leaderActor, new SendHeartBeat());
858 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
860 assertEquals(2, appendEntries.getLeaderCommit());
861 assertEquals(0, appendEntries.getEntries().size());
862 assertEquals(2, appendEntries.getPrevLogIndex());
864 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
866 assertEquals(2, appendEntriesReply.getLogLastIndex());
867 assertEquals(1, appendEntriesReply.getLogLastTerm());
869 assertEquals(2, followerActorContext.getCommitIndex());
875 public void testHandleAppendEntriesReplyFailure(){
876 logStart("testHandleAppendEntriesReplyFailure");
878 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
880 leader = new Leader(leaderActorContext);
882 // Send initial heartbeat reply with last index.
883 leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
885 FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
886 assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
888 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
890 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
892 assertEquals(RaftState.Leader, raftActorBehavior.state());
894 assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
898 public void testHandleAppendEntriesReplySuccess() throws Exception {
899 logStart("testHandleAppendEntriesReplySuccess");
901 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
903 leaderActorContext.setReplicatedLog(
904 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
906 leaderActorContext.setCommitIndex(1);
907 leaderActorContext.setLastApplied(1);
908 leaderActorContext.getTermInformation().update(1, "leader");
910 leader = new Leader(leaderActorContext);
912 AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
914 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
916 assertEquals(RaftState.Leader, raftActorBehavior.state());
918 assertEquals(2, leaderActorContext.getCommitIndex());
920 ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
921 leaderActor, ApplyJournalEntries.class);
923 assertEquals(2, leaderActorContext.getLastApplied());
925 assertEquals(2, applyJournalEntries.getToIndex());
927 List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
930 assertEquals(1,applyStateList.size());
932 ApplyState applyState = applyStateList.get(0);
934 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
938 public void testHandleAppendEntriesReplyUnknownFollower(){
939 logStart("testHandleAppendEntriesReplyUnknownFollower");
941 MockRaftActorContext leaderActorContext = createActorContext();
943 leader = new Leader(leaderActorContext);
945 AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
947 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
949 assertEquals(RaftState.Leader, raftActorBehavior.state());
953 public void testHandleRequestVoteReply(){
954 logStart("testHandleRequestVoteReply");
956 MockRaftActorContext leaderActorContext = createActorContext();
958 leader = new Leader(leaderActorContext);
960 // Should be a no-op.
961 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
962 new RequestVoteReply(1, true));
964 assertEquals(RaftState.Leader, raftActorBehavior.state());
966 raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
968 assertEquals(RaftState.Leader, raftActorBehavior.state());
972 public void testIsolatedLeaderCheckNoFollowers() {
973 logStart("testIsolatedLeaderCheckNoFollowers");
975 MockRaftActorContext leaderActorContext = createActorContext();
977 leader = new Leader(leaderActorContext);
978 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
979 Assert.assertTrue(behavior instanceof Leader);
983 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
984 logStart("testIsolatedLeaderCheckTwoFollowers");
986 new JavaTestKit(getSystem()) {{
988 ActorRef followerActor1 = getTestActor();
989 ActorRef followerActor2 = getTestActor();
991 MockRaftActorContext leaderActorContext = createActorContext();
993 Map<String, String> peerAddresses = new HashMap<>();
994 peerAddresses.put("follower-1", followerActor1.path().toString());
995 peerAddresses.put("follower-2", followerActor2.path().toString());
997 leaderActorContext.setPeerAddresses(peerAddresses);
999 leader = new Leader(leaderActorContext);
1001 leader.markFollowerActive("follower-1");
1002 leader.markFollowerActive("follower-2");
1003 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1004 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1005 behavior instanceof Leader);
1007 // kill 1 follower and verify if that got killed
1008 final JavaTestKit probe = new JavaTestKit(getSystem());
1009 probe.watch(followerActor1);
1010 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1011 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1012 assertEquals(termMsg1.getActor(), followerActor1);
1014 leader.markFollowerInActive("follower-1");
1015 leader.markFollowerActive("follower-2");
1016 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1017 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1018 behavior instanceof Leader);
1020 // kill 2nd follower and leader should change to Isolated leader
1021 followerActor2.tell(PoisonPill.getInstance(), null);
1022 probe.watch(followerActor2);
1023 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1024 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1025 assertEquals(termMsg2.getActor(), followerActor2);
1027 leader.markFollowerInActive("follower-2");
1028 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1029 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1030 behavior instanceof IsolatedLeader);
1036 public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1037 logStart("testAppendEntryCallAtEndofAppendEntryReply");
1039 MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1041 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1042 //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1043 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1045 leaderActorContext.setConfigParams(configParams);
1047 MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1049 followerActorContext.setConfigParams(configParams);
1051 Follower follower = new Follower(followerActorContext);
1052 followerActor.underlyingActor().setBehavior(follower);
1054 leaderActorContext.getReplicatedLog().removeFrom(0);
1055 leaderActorContext.setCommitIndex(-1);
1056 leaderActorContext.setLastApplied(-1);
1058 followerActorContext.getReplicatedLog().removeFrom(0);
1059 followerActorContext.setCommitIndex(-1);
1060 followerActorContext.setLastApplied(-1);
1062 leader = new Leader(leaderActorContext);
1064 AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1065 leaderActor, AppendEntriesReply.class);
1067 leader.handleMessage(followerActor, appendEntriesReply);
1069 // Clear initial heartbeat messages
1071 leaderActor.underlyingActor().clear();
1072 followerActor.underlyingActor().clear();
1075 leaderActorContext.setReplicatedLog(
1076 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1077 leaderActorContext.setCommitIndex(1);
1078 leaderActorContext.setLastApplied(1);
1080 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1081 TimeUnit.MILLISECONDS);
1083 leader.handleMessage(leaderActor, new SendHeartBeat());
1085 AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1087 // Should send first log entry
1088 assertEquals(1, appendEntries.getLeaderCommit());
1089 assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1090 assertEquals(-1, appendEntries.getPrevLogIndex());
1092 appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1094 assertEquals(1, appendEntriesReply.getLogLastTerm());
1095 assertEquals(0, appendEntriesReply.getLogLastIndex());
1097 followerActor.underlyingActor().clear();
1099 leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1101 appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1103 // Should send second log entry
1104 assertEquals(1, appendEntries.getLeaderCommit());
1105 assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1111 public void testLaggingFollowerStarvation() throws Exception {
1112 logStart("testLaggingFollowerStarvation");
1113 new JavaTestKit(getSystem()) {{
1114 String leaderActorId = actorFactory.generateActorId("leader");
1115 String follower1ActorId = actorFactory.generateActorId("follower");
1116 String follower2ActorId = actorFactory.generateActorId("follower");
1118 TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1119 actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1120 ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1121 ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1123 MockRaftActorContext leaderActorContext =
1124 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1126 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1127 configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1128 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1130 leaderActorContext.setConfigParams(configParams);
1132 leaderActorContext.setReplicatedLog(
1133 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1135 Map<String, String> peerAddresses = new HashMap<>();
1136 peerAddresses.put(follower1ActorId,
1137 follower1Actor.path().toString());
1138 peerAddresses.put(follower2ActorId,
1139 follower2Actor.path().toString());
1141 leaderActorContext.setPeerAddresses(peerAddresses);
1142 leaderActorContext.getTermInformation().update(1, leaderActorId);
1144 RaftActorBehavior leader = createBehavior(leaderActorContext);
1146 leaderActor.underlyingActor().setBehavior(leader);
1148 for(int i=1;i<6;i++) {
1149 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1150 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1151 assertTrue(newBehavior == leader);
1152 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1155 // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1156 List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1158 assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1159 heartbeats.size() > 1);
1161 // Check if follower-2 got AppendEntries during this time and was not starved
1162 List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1164 assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1165 appendEntries.size() > 1);
1171 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1172 ActorRef actorRef, RaftRPC rpc) throws Exception {
1173 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1174 assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1177 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1179 private final long electionTimeOutIntervalMillis;
1180 private final int snapshotChunkSize;
1182 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1184 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1185 this.snapshotChunkSize = snapshotChunkSize;
1189 public FiniteDuration getElectionTimeOutInterval() {
1190 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1194 public int getSnapshotChunkSize() {
1195 return snapshotChunkSize;