import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
sendHeartBeat();
return this;
- } else if(message instanceof InitiateInstallSnapshot) {
- installSnapshotIfNeeded();
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
followerNextIndex, leaderSnapShotIndex, leaderLastIndex
);
}
- actor().tell(new InitiateInstallSnapshot(), actor());
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ initiateCaptureSnapshot(followerId, followerNextIndex);
+
} else if(sendHeartbeat) {
//we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
}
/**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- *
+ * /**
* Install Snapshot works as follows
- * 1. Leader sends a InitiateInstallSnapshot message to self
- * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+ * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
+ * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
* and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 5. On complete, Follower sends back a InstallSnapshotReply.
- * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+ * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+ * 4. On complete, Follower sends back a InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
* and replenishes the memory by deleting the snapshot in Replicated log.
- *
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
+ * then send the existing snapshot in chunks to the follower.
+ * @param followerId
+ * @param followerNextIndex
*/
- private void installSnapshotIfNeeded() {
+ private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+ LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
}
- for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
-
- if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, e.getKey());
-
- } else if (!context.isSnapshotCaptureInitiated()) {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
+ if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
+ context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
+
+ } else if (!context.isSnapshotCaptureInitiated()) {
+
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
}
- }
- }
- }
-
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ boolean isInstallSnapshotInitiated = true;
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+ lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+ actor());
+ context.setSnapshotCaptureInitiated(true);
+ }
}
-
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
- context.setSnapshotCaptureInitiated(true);
}
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
assertTrue(raftBehavior instanceof Leader);
- // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+ // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
@Override
protected Boolean match(Object o) throws Exception {
- if (o instanceof InitiateInstallSnapshot) {
+ if (o instanceof CaptureSnapshot) {
return true;
}
return false;
}
}.get();
- boolean initiateInitiateInstallSnapshot = false;
+ boolean captureSnapshot = false;
for (Boolean b: matches) {
- initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+ captureSnapshot = b | captureSnapshot;
}
- assertTrue(initiateInitiateInstallSnapshot);
+ assertTrue(captureSnapshot);
}};
}
ActorRef followerActor = getTestActor();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
+ peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(leaderActor);
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
actorContext.setPeerAddresses(peerAddresses);
Map<String, String> leadersSnapshot = new HashMap<>();
leader.setSnapshot(Optional.<ByteString>absent());
// new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
actorContext.getReplicatedLog().append(entry);
- // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ //update follower timestamp
+ leader.markFollowerActive(followerActor.path().toString());
+
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new InitiateInstallSnapshot());
+ senderActor, new Replicate(null, "state-id", entry));
CaptureSnapshot cs = MessageCollectorActor.
getFirstMatching(leaderActor, CaptureSnapshot.class);
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+ leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());