import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
private Optional<ByteString> snapshot;
- private long replicatedToAllIndex = -1;
-
public AbstractLeader(RaftActorContext context) {
super(context);
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (ยง5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ sendAppendEntries(0);
}
/**
}
private void purgeInMemoryLog() {
- //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ //find the lowest index across followers which has been replicated to all.
+ // lastApplied if there are no followers, so that we keep clearing the log for single-node
// we would delete the in-mem log from that index on, in-order to minimize mem usage
// we would also share this info thru AE with the followers so that they can delete their log entries as well.
- long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
for (FollowerLogInformation info : followerToLog.values()) {
minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
}
- replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
}
@Override
sendHeartBeat();
return this;
- } else if(message instanceof InitiateInstallSnapshot) {
- installSnapshotIfNeeded();
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
+ wasLastChunk = true;
} else {
followerToSnapshot.markSendStatus(true);
followerToSnapshot.markSendStatus(false);
}
+
+ if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if(followerActor != null) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
context.getId(), reply.getChunkIndex(), followerId,
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
- sendAppendEntries();
+ sendAppendEntries(0);
}
}
- private void sendAppendEntries() {
+ private void sendAppendEntries(long timeSinceLastActivityInterval) {
// Send an AppendEntries to all followers
- long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
final FollowerLogInformation followerLogInformation = e.getValue();
// This checks helps not to send a repeat message to the follower
- if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+ if(!followerLogInformation.isFollowerActive() ||
+ followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
sendUpdatesToFollower(followerId, followerLogInformation, true);
}
}
sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex) {
+ leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
followerNextIndex, leaderSnapShotIndex, leaderLastIndex
);
}
- actor().tell(new InitiateInstallSnapshot(), actor());
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ initiateCaptureSnapshot(followerId, followerNextIndex);
+
} else if(sendHeartbeat) {
//we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), replicatedToAllIndex);
+ context.getCommitIndex(), super.getReplicatedToAllIndex());
if(!entries.isEmpty()) {
LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
}
/**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- *
* Install Snapshot works as follows
- * 1. Leader sends a InitiateInstallSnapshot message to self
- * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+ * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
+ * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
* and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 5. On complete, Follower sends back a InstallSnapshotReply.
- * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+ * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+ * 4. On complete, Follower sends back a InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
* and replenishes the memory by deleting the snapshot in Replicated log.
- *
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
+ * then send the existing snapshot in chunks to the follower.
+ * @param followerId
+ * @param followerNextIndex
*/
- private void installSnapshotIfNeeded() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
- }
-
- for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
-
- if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, e.getKey());
-
- } else {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
+ private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
+ if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
+ context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
+
+ } else if (!context.isSnapshotCaptureInitiated()) {
+
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
}
- }
- }
- }
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ boolean isInstallSnapshotInitiated = true;
+ long replicatedToAllIndex = super.getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+ isInstallSnapshotInitiated), actor());
+ context.setSnapshotCaptureInitiated(true);
+ }
}
-
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
}
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
+ LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
}
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- sendAppendEntries();
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
}
}