X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=0188a6df1ac387e603962d838def314a7dab1c8d;hb=86e8e4a06b682aa772c834a2cef56d0596540e1b;hp=7c439f7d5c929910c7dd04f4504deb7dc81c589f;hpb=d3e310b940b60f6590f0e94a576aece95a055942;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 7c439f7d5c..0188a6df1a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -5,17 +5,18 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -24,34 +25,45 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; import java.util.Queue; -import javax.annotation.Nullable; +import java.util.concurrent.TimeUnit; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** - * The behavior of a RaftActor when it is in the Leader state - *
+ * The behavior of a RaftActor when it is in the Leader state. + * + ** Leaders: *
* Install Snapshot works as follows
- * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 4. On complete, Follower sends back a InstallSnapshotReply.
- * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
- * then send the existing snapshot in chunks to the follower.
- * @param followerId
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
+ *
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
- public boolean initiateCaptureSnapshot(String followerId) {
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
+ public boolean initiateCaptureSnapshot(final String followerId) {
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
+ if (snapshotHolder.isPresent()) {
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
return true;
- } else {
- return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
}
+
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
+ }
+
+ return captureInitiated;
}
- private boolean canInstallSnapshot(long nextIndex){
+ private boolean canInstallSnapshot(final long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return nextIndex == -1 ||
- (!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex));
+ return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
}
private void sendInstallSnapshot() {
- LOG.debug("{}: sendInstallSnapshot", logName());
+ log.debug("{}: sendInstallSnapshot", logName());
for (Entry