import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
import com.google.protobuf.ByteString;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import scala.concurrent.duration.FiniteDuration;
/**
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot = Optional.absent();
+ private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
- snapshot = initializeFromLeader.snapshot;
+ snapshotHolder = initializeFromLeader.snapshotHolder;
trackers.addAll(initializeFromLeader.trackers);
} else {
for (PeerInfo peerInfo: context.getPeers()) {
}
@VisibleForTesting
- void setSnapshot(@Nullable Snapshot snapshot) {
- if (snapshot != null) {
- this.snapshot = Optional.of(new SnapshotHolder(snapshot));
- } else {
- this.snapshot = Optional.absent();
- }
+ void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+ this.snapshotHolder = Optional.fromNullable(snapshotHolder);
}
@VisibleForTesting
boolean hasSnapshot() {
- return snapshot.isPresent();
+ return snapshotHolder.isPresent();
}
@Override
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
} else if (message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+ SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+ setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
sendInstallSnapshot();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+ " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1);
- long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
*/
public boolean initiateCaptureSnapshot(String followerId) {
FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
// If a snapshot is present in the memory, most likely another install is in progress no need to capture
// snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
* InstallSnapshot should qualify as a heartbeat too.
*/
private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
- if (snapshot.isPresent()) {
+ if (snapshotHolder.isPresent()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
}
// Ensure the snapshot bytes are set - this is a no-op.
- installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
nextSnapshotChunk,
nextChunkIndex,
installSnapshotState.getTotalChunks(),
return followerToLog.size();
}
- private static class SnapshotHolder {
+ static class SnapshotHolder {
private final long lastIncludedTerm;
private final long lastIncludedIndex;
private final ByteString snapshotBytes;
- SnapshotHolder(Snapshot snapshot) {
+ SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
- this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ try {
+ this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading state", e);
+ }
}
long getLastIncludedTerm() {