import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
import com.google.protobuf.ByteString;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import scala.concurrent.duration.FiniteDuration;
/**
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot = Optional.absent();
+ private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
- snapshot = initializeFromLeader.snapshot;
+ snapshotHolder = initializeFromLeader.snapshotHolder;
trackers.addAll(initializeFromLeader.trackers);
} else {
for (PeerInfo peerInfo: context.getPeers()) {
}
@VisibleForTesting
- void setSnapshot(@Nullable Snapshot snapshot) {
- if (snapshot != null) {
- this.snapshot = Optional.of(new SnapshotHolder(snapshot));
- } else {
- this.snapshot = Optional.absent();
- }
+ void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+ this.snapshotHolder = Optional.fromNullable(snapshotHolder);
}
@VisibleForTesting
boolean hasSnapshot() {
- return snapshot.isPresent();
+ return snapshotHolder.isPresent();
}
@Override
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
} else if (message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+ SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+ setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
sendInstallSnapshot();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+ " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1);
- long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
/**
* This method checks if any update needs to be sent to the given follower. This includes append log entries,
* sending next snapshot chunk, and initiating a snapshot.
- *
- * @return true if any update is sent, false otherwise
*/
private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
boolean sendHeartbeat, boolean isHeartbeat) {
*/
public boolean initiateCaptureSnapshot(String followerId) {
FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
// If a snapshot is present in the memory, most likely another install is in progress no need to capture
// snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
// Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
sendSnapshotChunk(followerActor, followerLogInfo);
return true;
- } else {
- boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- if (captureInitiated) {
- followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
- context.getConfigParams().getSnapshotChunkSize(), logName()));
- }
+ }
- return captureInitiated;
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
}
+
+ return captureInitiated;
}
private boolean canInstallSnapshot(long nextIndex) {
* InstallSnapshot should qualify as a heartbeat too.
*/
private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
}
// Ensure the snapshot bytes are set - this is a no-op.
- installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
nextSnapshotChunk,
nextChunkIndex,
installSnapshotState.getTotalChunks(),
return followerToLog.size();
}
- private static class SnapshotHolder {
+ static class SnapshotHolder {
private final long lastIncludedTerm;
private final long lastIncludedIndex;
private final ByteString snapshotBytes;
- SnapshotHolder(Snapshot snapshot) {
+ SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
- this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ try {
+ this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading state", e);
+ }
}
long getLastIncludedTerm() {