import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
*
* @return Collection of follower IDs
*/
- protected final Collection<String> getFollowerIds() {
+ public final Collection<String> getFollowerIds() {
return followerToLog.keySet();
}
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
if (followerActor != null) {
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
followerNextIndex, followerId);
// FIXME : Sending one entry at a time
- final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+ if(followerLogInformation.okToReplicate()) {
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ sendAppendEntries = true;
+ }
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
}
// Send heartbeat to follower whenever install snapshot is initiated.
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+ sendAppendEntries = true;
initiateCaptureSnapshot(followerId, followerNextIndex);
} else if(sendHeartbeat) {
- //we send an AppendEntries, even if the follower is inactive
+ // we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
}
+
+ if(sendAppendEntries) {
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ entries, followerId);
+ }
}
}
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}