Use Collections.emptyList()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index a4753a4fe432654554a64578af0b09233069146b..bdfdd9b3765c576495e5bbf96dcdb19958c73cd5 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -134,7 +133,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      * @return Collection of follower IDs
      */
-    protected final Collection<String> getFollowerIds() {
+    public final Collection<String> getFollowerIds() {
         return followerToLog.keySet();
     }
 
@@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        if (!context.isSnapshotCaptureInitiated()) {
+        if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
 
@@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerToSnapshot.markSendStatus(false);
             }
 
-            if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+            if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
                 // Since the follower is now caught up try to purge the log.
                 purgeInMemoryLog();
             } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
@@ -460,6 +459,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (followerActor != null) {
             long followerNextIndex = followerLogInformation.getNextIndex();
             boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            boolean sendAppendEntries = false;
+            List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if (mapFollowerToSnapshot.get(followerId) != null) {
                 // if install snapshot is in process , then sent next chunk if possible
@@ -467,8 +468,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendSnapshotChunk(followerActor, followerId);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
@@ -485,12 +485,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     // FIXME : Sending one entry at a time
-                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+                    if(followerLogInformation.okToReplicate()) {
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+                        sendAppendEntries = true;
+                    }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+                    leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -503,19 +503,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     }
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+                    sendAppendEntries = true;
                     initiateCaptureSnapshot(followerId, followerNextIndex);
 
                 } else if(sendHeartbeat) {
-                    //we send an AppendEntries, even if the follower is inactive
+                    // we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
 
             }
+
+            if(sendAppendEntries) {
+                sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                        entries, followerId);
+            }
         }
     }
 
@@ -559,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 sendSnapshotChunk(followerActor, followerId);
 
-            } else if (!context.isSnapshotCaptureInitiated()) {
 
-                ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-                long lastAppliedIndex = -1;
-                long lastAppliedTerm = -1;
-
-                if (lastAppliedEntry != null) {
-                    lastAppliedIndex = lastAppliedEntry.getIndex();
-                    lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
-                    lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-                    lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
-                }
-
-                boolean isInstallSnapshotInitiated = true;
-                long replicatedToAllIndex = super.getReplicatedToAllIndex();
-                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
-                CaptureSnapshot captureSnapshot = new CaptureSnapshot(
-                        lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
-                        isInstallSnapshotInitiated);
-
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
-                            captureSnapshot);
-                }
-
-                actor().tell(captureSnapshot, actor());
-                context.setSnapshotCaptureInitiated(true);
+            } else {
+                context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+                        this.getReplicatedToAllIndex(), followerId);
             }
         }
     }