import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setCommitIndex(-1);
short payloadVersion = (short)5;
actorContext.setPayloadVersion(payloadVersion);
logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setCommitIndex(-1);
// The raft context is initialized with a couple log entries. However the commitIndex
// is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
//send first chunk and no InstallSnapshotReply received yet
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
- AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
- AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+ AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+ installSnapshot.getLastChunkHashCode().get().intValue());
int hashCode = Arrays.hashCode(installSnapshot.getData());
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
assertEquals(bs.size(), barray.length);
new FiniteDuration(1000, TimeUnit.SECONDS));
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
String nonVotingFollowerId = "nonvoting-follower";
TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}