/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** * The behavior of a RaftActor when it is in the Leader state * *
* Leaders: *
* Install Snapshot works as follows
* 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
* 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
* the Leader's handleMessage with a SendInstallSnapshot message.
* 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
* the Follower via InstallSnapshot messages.
* 4. For each chunk, the Follower sends back an InstallSnapshotReply.
* 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
* follower.
* 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
* then send the existing snapshot in chunks to the follower.
*
* @param followerId the id of the follower.
* @return true if capture was initiated, false otherwise.
*/
public boolean initiateCaptureSnapshot(String followerId) {
FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
if (snapshotHolder.isPresent()) {
// If a snapshot is present in the memory, most likely another install is in progress no need to capture
// snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
// Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
sendSnapshotChunk(followerActor, followerLogInfo);
return true;
}
boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
if (captureInitiated) {
followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
context.getConfigParams().getSnapshotChunkSize(), logName()));
}
return captureInitiated;
}
private boolean canInstallSnapshot(long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
&& context.getReplicatedLog().isInSnapshot(nextIndex);
}
private void sendInstallSnapshot() {
log.debug("{}: sendInstallSnapshot", logName());
for (Entry