import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.concepts.Identifier;
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (leader != null) {
leader.close();
}
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
return sendReplicate(actorContext, 1, index);
}
private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+ }
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
- return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
}
@Test
actorContext.getReplicatedLog().append(newEntry);
final Identifier id = new MockIdentifier("state-id");
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(leaderActor, id, newEntry, true));
// State should not change
assertTrue(raftBehavior instanceof Leader);
leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
- for (int i = 0; i <= newLogIndex - 1; i++ ) {
+ for (int i = 0; i <= newLogIndex - 1; i++) {
ApplyState applyState = applyStateList.get(i);
assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
//send first chunk and no InstallSnapshotReply received yet
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertTrue(raftBehavior instanceof Leader);
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
actorContext.getReplicatedLog().removeFrom(0);
+ AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
+
leader = new Leader(actorContext);
actorContext.setCurrentBehavior(leader);
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it should not initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+ assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Sending Replicate message should not initiate another capture since the first is in progress.
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+ // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+ // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+ final byte[] bytes = new byte[]{1, 2, 3};
+ installSnapshotStream.get().get().write(bytes);
+ actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+ Runtime.getRuntime().totalMemory());
+ MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+ MessageCollectorActor.clearMessages(followerActor);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
}
@Test
- public void testLeaderInstallSnapshotState() {
+ public void testLeaderInstallSnapshotState() throws IOException {
logStart("testLeaderInstallSnapshotState");
Map<String, String> leadersSnapshot = new HashMap<>();
byte[] barray = bs.toByteArray();
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(barray));
assertEquals(bs.size(), barray.length);
}
assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+ fts.close();
}
@Override
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
return createActorContext(LEADER_ID, actorRef);
}
- private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
assertTrue("Expected Leader", newBehavior instanceof Leader);
}
- private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
+ @Test
+ public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+ logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+ final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+ Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+ final MockRaftActorContext.MockPayload largePayload =
+ new MockRaftActorContext.MockPayload("large", serializedSize);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(300, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send normal payload first to prime commit index.
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ sendReplicate(leaderActorContext, term, 0);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+ assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+ sendReplicate(leaderActorContext, term, 1, largePayload);
+
+ MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+ assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+ final Identifier slicingId = messageSlice.getIdentifier();
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+ messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send another normal payload.
+
+ sendReplicate(leaderActorContext, term, 2);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+ }
+
+ @Test
+ public void testLargePayloadSlicingExpiration() {
+ logStart("testLargePayloadSlicingExpiration");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+ leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+ // Sleep for at least 3 * election timeout so the slicing state expires.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+ MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send an AppendEntriesReply - this should restart the slicing.
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ }
+
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
- MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
super();
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
this.snapshotChunkSize = snapshotChunkSize;