Merge "Fix a non-generic reference to DataContainerNode"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index 199d2d61cf5bbbba34ad8cfa62709228331d2b0f..d83362b58081c0e4c4576a848bf10ca29d8fc7da 100644 (file)
@@ -11,7 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -21,6 +22,8 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -67,44 +70,37 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    protected final Map<String, FollowerLogInformation> followerToLog =
-        new HashMap();
+    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
 
     private Cancellable heartbeatSchedule = null;
-    private Cancellable appendEntriesSchedule = null;
     private Cancellable installSnapshotSchedule = null;
 
     private List<ClientRequestTracker> trackerList = new ArrayList<>();
 
     private final int minReplicationCount;
 
-    private final LoggingAdapter LOG;
+    private Optional<ByteString> snapshot;
 
     public Leader(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
-        if (lastIndex() >= 0) {
-            context.setCommitIndex(lastIndex());
-        }
-
         followers = context.getPeerAddresses().keySet();
 
         for (String followerId : followers) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(lastIndex()),
-                    new AtomicLong(-1));
+                    new AtomicLong(context.getCommitIndex()),
+                    new AtomicLong(-1),
+                    context.getConfigParams().getElectionTimeOutInterval());
 
             followerToLog.put(followerId, followerLogInformation);
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers:" + followers);
+            LOG.debug("Election:Leader has following peers: {}", followers);
         }
 
         if (followers.size() > 0) {
@@ -113,6 +109,7 @@ public class Leader extends AbstractRaftActorBehavior {
             minReplicationCount = 0;
         }
 
+        snapshot = Optional.absent();
 
         // Immediately schedule a heartbeat
         // Upon election: send initial empty AppendEntries RPCs
@@ -127,17 +124,26 @@ public class Leader extends AbstractRaftActorBehavior {
 
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    private Optional<ByteString> getSnapshot() {
+        return snapshot;
+    }
+
+    @VisibleForTesting
+    void setSnapshot(Optional<ByteString> snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug(appendEntries.toString());
         }
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
         if(! appendEntriesReply.isSuccess()) {
@@ -153,9 +159,11 @@ public class Leader extends AbstractRaftActorBehavior {
 
         if(followerLogInformation == null){
             LOG.error("Unknown follower {}", followerId);
-            return state();
+            return this;
         }
 
+        followerLogInformation.markFollowerActive();
+
         if (appendEntriesReply.isSuccess()) {
             followerLogInformation
                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
@@ -203,7 +211,7 @@ public class Leader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        return state();
+        return this;
     }
 
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
@@ -226,16 +234,16 @@ public class Leader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
         Object message = fromSerializableMessage(originalMessage);
@@ -247,17 +255,27 @@ public class Leader extends AbstractRaftActorBehavior {
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
+
+                return switchBehavior(new Follower(context));
             }
         }
 
         try {
             if (message instanceof SendHeartBeat) {
-                return sendHeartBeat();
-            } else if(message instanceof SendInstallSnapshot) {
+                sendHeartBeat();
+                return this;
+
+            } else if(message instanceof InitiateInstallSnapshot) {
                 installSnapshotIfNeeded();
+
+            } else if(message instanceof SendInstallSnapshot) {
+                // received from RaftActor
+                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+                sendInstallSnapshot();
+
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
+
             } else if (message instanceof InstallSnapshotReply){
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
@@ -271,8 +289,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot =
-            mapFollowerToSnapshot.get(followerId);
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+        followerLogInformation.markFollowerActive();
 
         if (followerToSnapshot != null &&
             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
@@ -288,8 +307,6 @@ public class Leader extends AbstractRaftActorBehavior {
                         );
                     }
 
-                    FollowerLogInformation followerLogInformation =
-                        followerToLog.get(followerId);
                     followerLogInformation.setMatchIndex(
                         context.getReplicatedLog().getSnapshotIndex());
                     followerLogInformation.setNextIndex(
@@ -301,6 +318,12 @@ public class Leader extends AbstractRaftActorBehavior {
                             followerToLog.get(followerId).getNextIndex().get());
                     }
 
+                    if (mapFollowerToSnapshot.isEmpty()) {
+                        // once there are no pending followers receiving snapshots
+                        // we can remove snapshot from the memory
+                        setSnapshot(Optional.<ByteString>absent());
+                    }
+
                 } else {
                     followerToSnapshot.markSendStatus(true);
                 }
@@ -325,7 +348,7 @@ public class Leader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message " + logIndex);
+            LOG.debug("Replicate message {}", logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -352,64 +375,87 @@ public class Leader extends AbstractRaftActorBehavior {
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
                 long followerNextIndex = followerLogInformation.getNextIndex().get();
-                List<ReplicatedLogEntry> entries = Collections.emptyList();
+                boolean isFollowerActive = followerLogInformation.isFollowerActive();
+                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
-                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    // if install snapshot is in process , then sent next chunk if possible
+                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerId);
+                    } else {
+                        // we send a heartbeat even if we have not received a reply for the last chunk
+                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                            Collections.<ReplicatedLogEntry>emptyList());
                     }
 
                 } else {
+                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 
-                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    if (isFollowerActive &&
+                        context.getReplicatedLog().isPresent(followerNextIndex)) {
                         // FIXME : Sending one entry at a time
                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
-                        followerActor.tell(
-                            new AppendEntries(currentTerm(), context.getId(),
-                                prevLogIndex(followerNextIndex),
-                                prevLogTerm(followerNextIndex), entries,
-                                context.getCommitIndex()).toSerializable(),
-                            actor()
-                        );
-
-                    } else {
-                        // if the followers next index is not present in the leaders log, then snapshot should be sent
-                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
-                            // if the follower is just not starting and leader's index
-                            // is more than followers index
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("SendInstallSnapshot to follower:{}," +
-                                        "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                        "leader-last-index:{}", followerId,
-                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                                );
-                            }
-
-                            actor().tell(new SendInstallSnapshot(), actor());
-                        } else {
-                            followerActor.tell(
-                                new AppendEntries(currentTerm(), context.getId(),
-                                    prevLogIndex(followerNextIndex),
-                                    prevLogTerm(followerNextIndex), entries,
-                                    context.getCommitIndex()).toSerializable(),
-                                actor()
+                    } else if (isFollowerActive && followerNextIndex >= 0 &&
+                        leaderLastIndex >= followerNextIndex ) {
+                        // if the followers next index is not present in the leaders log, and
+                        // if the follower is just not starting and if leader's index is more than followers index
+                        // then snapshot should be sent
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                    "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
                             );
                         }
+                        actor().tell(new InitiateInstallSnapshot(), actor());
+
+                        // we would want to sent AE as the capture snapshot might take time
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+
+                    } else {
+                        //we send an AppendEntries, even if the follower is inactive
+                        // in-order to update the followers timestamp, in case it becomes active again
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
                     }
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
                 }
             }
         }
     }
 
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+        List<ReplicatedLogEntry> entries) {
+        followerActor.tell(
+            new AppendEntries(currentTerm(), context.getId(),
+                prevLogIndex(followerNextIndex),
+                prevLogTerm(followerNextIndex), entries,
+                context.getCommitIndex()).toSerializable(),
+            actor()
+        );
+    }
+
     /**
      * An installSnapshot is scheduled at a interval that is a multiple of
      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
      * snapshots at every heartbeat.
+     *
+     * Install Snapshot works as follows
+     * 1. Leader sends a InitiateInstallSnapshot message to self
+     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+     * 5. On complete, Follower sends back a InstallSnapshotReply.
+     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+     * and replenishes the memory by deleting the snapshot in Replicated log.
+     *
      */
-    private void installSnapshotIfNeeded(){
+    private void installSnapshotIfNeeded() {
         for (String followerId : followers) {
             ActorSelection followerActor =
                 context.getPeerActorSelection(followerId);
@@ -420,6 +466,58 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    LOG.info("{} follower needs a snapshot install", followerId);
+                    if (snapshot.isPresent()) {
+                        // if a snapshot is present in the memory, most likely another install is in progress
+                        // no need to capture snapshot
+                        sendSnapshotChunk(followerActor, followerId);
+
+                    } else {
+                        initiateCaptureSnapshot();
+                        //we just need 1 follower who would need snapshot to be installed.
+                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+                        // who needs an install and send to all who need
+                        break;
+                    }
+
+                }
+            }
+        }
+    }
+
+    // on every install snapshot, we try to capture the snapshot.
+    // Once a capture is going on, another one issued will get ignored by RaftActor.
+    private void initiateCaptureSnapshot() {
+        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+        long lastAppliedIndex = -1;
+        long lastAppliedTerm = -1;
+
+        if (lastAppliedEntry != null) {
+            lastAppliedIndex = lastAppliedEntry.getIndex();
+            lastAppliedTerm = lastAppliedEntry.getTerm();
+        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        boolean isInstallSnapshotInitiated = true;
+        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+            actor());
+    }
+
+
+    private void sendInstallSnapshot() {
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     sendSnapshotChunk(followerActor, followerId);
@@ -434,22 +532,23 @@ public class Leader extends AbstractRaftActorBehavior {
      */
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    context.getReplicatedLog().getSnapshotIndex(),
-                    context.getReplicatedLog().getSnapshotTerm(),
-                    getNextSnapshotChunk(followerId,
-                        context.getReplicatedLog().getSnapshot()),
-                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
-                ).toSerializable(),
-                actor()
-            );
-            LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            if (snapshot.isPresent()) {
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        getNextSnapshotChunk(followerId,snapshot.get()),
+                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                    ).toSerializable(),
+                    actor()
+                );
+                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            }
         } catch (IOException e) {
-            LOG.error("InstallSnapshot failed for Leader.", e);
+            LOG.error(e, "InstallSnapshot failed for Leader.");
         }
     }
 
@@ -464,18 +563,16 @@ public class Leader extends AbstractRaftActorBehavior {
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
         }
-
         return nextChunk;
     }
 
-    private RaftState sendHeartBeat() {
+    private void sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
         }
-        return state();
     }
 
     private void stopHeartBeat() {
@@ -504,14 +601,11 @@ public class Leader extends AbstractRaftActorBehavior {
         // Scheduling the heartbeat only once here because heartbeats do not
         // need to be sent if there are other messages being sent to the remote
         // actor.
-        heartbeatSchedule =
-            context.getActorSystem().scheduler().scheduleOnce(
-                interval,
-                context.getActor(), new SendHeartBeat(),
-                context.getActorSystem().dispatcher(), context.getActor());
+        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+            interval, context.getActor(), new SendHeartBeat(),
+            context.getActorSystem().dispatcher(), context.getActor());
     }
 
-
     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
         if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
@@ -526,7 +620,7 @@ public class Leader extends AbstractRaftActorBehavior {
         installSnapshotSchedule =
             context.getActorSystem().scheduler().scheduleOnce(
                 interval,
-                context.getActor(), new SendInstallSnapshot(),
+                context.getActor(), new InitiateInstallSnapshot(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
@@ -637,4 +731,19 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    // called from example-actor for printing the follower-states
+    public String printFollowerStates() {
+        StringBuilder sb = new StringBuilder();
+        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+        }
+        return "[" + sb.toString() + "]";
+    }
+
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        followerToLog.get(followerId).markFollowerActive();
+    }
 }