Bug 7521: Convert Snapshot to store a State instance
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 1b3abffbb0a09031eafcc55fd8a9cc3ac070a2d9..548b920fe771f183b904e2d5e2e65d7001cb746e 100644 (file)
@@ -14,7 +14,9 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot = Optional.absent();
+    private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            snapshot = initializeFromLeader.snapshot;
+            snapshotHolder = initializeFromLeader.snapshotHolder;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
@@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(@Nullable Snapshot snapshot) {
-        if (snapshot != null) {
-            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
-        } else {
-            this.snapshot = Optional.absent();
-        }
+    void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+        this.snapshotHolder = Optional.fromNullable(snapshotHolder);
     }
 
     @VisibleForTesting
     boolean hasSnapshot() {
-        return snapshot.isPresent();
+        return snapshotHolder.isPresent();
     }
 
     @Override
@@ -451,8 +449,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         } else if (message instanceof SendInstallSnapshot) {
-            // received from RaftActor
-            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+            SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+            setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
@@ -497,7 +495,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1);
 
-                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
@@ -606,8 +604,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     /**
      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
      * sending next snapshot chunk, and initiating a snapshot.
-     *
-     * @return true if any update is sent, false otherwise
      */
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
                                        boolean sendHeartbeat, boolean isHeartbeat) {
@@ -732,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     public boolean initiateCaptureSnapshot(String followerId) {
         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
             // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -740,16 +736,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
             sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
-        } else {
-            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
-                    this.getReplicatedToAllIndex(), followerId);
-            if (captureInitiated) {
-                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
-                        context.getConfigParams().getSnapshotChunkSize(), logName()));
-            }
+        }
 
-            return captureInitiated;
+        boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            this.getReplicatedToAllIndex(), followerId);
+        if (captureInitiated) {
+            followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                context.getConfigParams().getSnapshotChunkSize(), logName()));
         }
+
+        return captureInitiated;
     }
 
     private boolean canInstallSnapshot(long nextIndex) {
@@ -785,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  InstallSnapshot should qualify as a heartbeat too.
      */
     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
             if (installSnapshotState == null) {
                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
@@ -794,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
             byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
@@ -809,8 +805,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             followerActor.tell(
                 new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshot.get().getLastIncludedIndex(),
-                    snapshot.get().getLastIncludedTerm(),
+                    snapshotHolder.get().getLastIncludedIndex(),
+                    snapshotHolder.get().getLastIncludedTerm(),
                     nextSnapshotChunk,
                     nextChunkIndex,
                     installSnapshotState.getTotalChunks(),
@@ -913,15 +909,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.size();
     }
 
-    private static class SnapshotHolder {
+    static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
         private final ByteString snapshotBytes;
 
-        SnapshotHolder(Snapshot snapshot) {
+        SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+            try {
+                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
+            } catch (IOException e) {
+                throw new RuntimeException("Error reading state", e);
+            }
         }
 
         long getLastIncludedTerm() {