Initial message bus implementation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 9b6c08857a50d50e81065c52c39c6c5469687ad9..9e4f3b46c45b5de54458770b910b63f6c70305a8 100644 (file)
@@ -34,7 +34,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -309,9 +308,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 sendHeartBeat();
                 return this;
 
-            } else if(message instanceof InitiateInstallSnapshot) {
-                installSnapshotIfNeeded();
-
             } else if(message instanceof SendInstallSnapshot) {
                 // received from RaftActor
                 setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
@@ -488,12 +484,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, leaderSnapShotIndex, leaderLastIndex
                         );
                     }
-                    actor().tell(new InitiateInstallSnapshot(), actor());
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
                     sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
                             Collections.<ReplicatedLogEntry>emptyList(), followerId);
 
+                    initiateCaptureSnapshot(followerId, followerNextIndex);
+
                 } else if(sendHeartbeat) {
                     //we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
@@ -521,74 +518,57 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
-     * An installSnapshot is scheduled at a interval that is a multiple of
-     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
-     * snapshots at every heartbeat.
-     *
+     * /**
      * Install Snapshot works as follows
-     * 1. Leader sends a InitiateInstallSnapshot message to self
-     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
-     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+     * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
+     * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
      * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
-     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
-     * 5. On complete, Follower sends back a InstallSnapshotReply.
-     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+     * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+     * 4. On complete, Follower sends back a InstallSnapshotReply.
+     * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
      * and replenishes the memory by deleting the snapshot in Replicated log.
-     *
+     * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
+     * then send the existing snapshot in chunks to the follower.
+     * @param followerId
+     * @param followerNextIndex
      */
-    private void installSnapshotIfNeeded() {
+    private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+            LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
         }
 
-        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
-            final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
-
-            if (followerActor != null) {
-                long nextIndex = e.getValue().getNextIndex();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
-                    if (snapshot.isPresent()) {
-                        // if a snapshot is present in the memory, most likely another install is in progress
-                        // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, e.getKey());
-
-                    } else if (!context.isSnapshotCaptureInitiated()) {
-                        initiateCaptureSnapshot();
-                        //we just need 1 follower who would need snapshot to be installed.
-                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
-                        // who needs an install and send to all who need
-                        break;
-                    }
+        if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
+                context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
 
+            if (snapshot.isPresent()) {
+                // if a snapshot is present in the memory, most likely another install is in progress
+                // no need to capture snapshot.
+                // This could happen if another follower needs an install when one is going on.
+                final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+                sendSnapshotChunk(followerActor, followerId);
+
+            } else if (!context.isSnapshotCaptureInitiated()) {
+
+                LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
+                ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+                long lastAppliedIndex = -1;
+                long lastAppliedTerm = -1;
+
+                if (lastAppliedEntry != null) {
+                    lastAppliedIndex = lastAppliedEntry.getIndex();
+                    lastAppliedTerm = lastAppliedEntry.getTerm();
+                } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+                    lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+                    lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
                 }
-            }
-        }
-    }
-
-    // on every install snapshot, we try to capture the snapshot.
-    // Once a capture is going on, another one issued will get ignored by RaftActor.
-    private void initiateCaptureSnapshot() {
-        LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
-        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-        long lastAppliedIndex = -1;
-        long lastAppliedTerm = -1;
 
-        if (lastAppliedEntry != null) {
-            lastAppliedIndex = lastAppliedEntry.getIndex();
-            lastAppliedTerm = lastAppliedEntry.getTerm();
-        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
-            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+                boolean isInstallSnapshotInitiated = true;
+                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+                                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+                        actor());
+                context.setSnapshotCaptureInitiated(true);
+            }
         }
-
-        boolean isInstallSnapshotInitiated = true;
-        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-            actor());
-        context.setSnapshotCaptureInitiated(true);
     }