X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=4382cfec3237075087837b5f1402cadab249188f;hb=fbeed3e25ef80495c67bfe1698daf44b88fb10ff;hp=5076a8a38f891c894b9e01fdc61db5c0d5fa11a9;hpb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 5076a8a38f..4382cfec32 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,7 +14,8 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -34,8 +35,8 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -47,11 +48,13 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** * The behavior of a RaftActor when it is in the Leader state - *
+ * + ** Leaders: *
* Install Snapshot works as follows
- * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 4. On complete, Follower sends back a InstallSnapshotReply.
- * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
- * then send the existing snapshot in chunks to the follower.
- * @param followerId
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
+ *
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
public boolean initiateCaptureSnapshot(String followerId) {
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
+ if (snapshotHolder.isPresent()) {
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
return true;
- } else {
- return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
}
+
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
+ }
+
+ return captureInitiated;
}
- private boolean canInstallSnapshot(long nextIndex){
+ private boolean canInstallSnapshot(long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return nextIndex == -1 ||
- (!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex));
+ return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
}
private void sendInstallSnapshot() {
- LOG.debug("{}: sendInstallSnapshot", logName());
+ log.debug("{}: sendInstallSnapshot", logName());
for (Entry